Merge pull request #116580 from justinsb/applyset_refactor

kubectl prunev2: Refactor the applyset to be more reusable
This commit is contained in:
Kubernetes Prow Robot 2023-03-14 14:13:26 -07:00 committed by GitHub
commit 3c6ad6df1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 152 additions and 92 deletions

View File

@ -313,7 +313,7 @@ func (flags *ApplyFlags) ToOptions(f cmdutil.Factory, cmd *cobra.Command, baseNa
if enforceNamespace && parent.IsNamespaced() { if enforceNamespace && parent.IsNamespaced() {
parent.Namespace = namespace parent.Namespace = namespace
} }
tooling := ApplySetTooling{name: baseName, version: ApplySetToolVersion} tooling := ApplySetTooling{Name: baseName, Version: ApplySetToolVersion}
restClient, err := f.UnstructuredClientForMapping(parent.RESTMapping) restClient, err := f.UnstructuredClientForMapping(parent.RESTMapping)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize RESTClient for ApplySet: %w", err) return nil, fmt.Errorf("failed to initialize RESTClient for ApplySet: %w", err)
@ -467,7 +467,7 @@ func (o *ApplyOptions) GetObjects() ([]*resource.Info, error) {
o.objects, err = r.Infos() o.objects, err = r.Infos()
if o.ApplySet != nil { if o.ApplySet != nil {
if err := o.ApplySet.addLabels(o.objects); err != nil { if err := o.ApplySet.AddLabels(o.objects...); err != nil {
return nil, err return nil, err
} }
} }
@ -510,22 +510,11 @@ func (o *ApplyOptions) Run() error {
} }
if o.ApplySet != nil { if o.ApplySet != nil {
if err := o.ApplySet.FetchParent(); err != nil { if err := o.ApplySet.BeforeApply(infos, o.DryRunStrategy, o.ValidationDirective); err != nil {
return err
}
// Update the live parent object to the superset of the current and previous resources.
// Doing this before the actual apply and prune operations improves behavior by ensuring
// the live object contains the superset on failure. This may cause the next pruning
// operation to make a larger number of GET requests than strictly necessary, but it prevents
// object leakage from the set. The superset will automatically be reduced to the correct
// set by the next successful operation.
for _, info := range infos {
o.ApplySet.AddResource(info.ResourceMapping(), info.Namespace)
}
if err := o.ApplySet.UpdateParent(UpdateToSuperset, o.DryRunStrategy, o.ValidationDirective); err != nil {
return err return err
} }
} }
// Iterate through all objects, applying each one. // Iterate through all objects, applying each one.
for _, info := range infos { for _, info := range infos {
if err := o.applyOneObject(info); err != nil { if err := o.applyOneObject(info); err != nil {
@ -1030,19 +1019,12 @@ func (o *ApplyOptions) PrintAndPrunePostProcessor() func() error {
if o.Prune { if o.Prune {
if cmdutil.ApplySet.IsEnabled() && o.ApplySet != nil { if cmdutil.ApplySet.IsEnabled() && o.ApplySet != nil {
pruner, err := newApplySetPruner(o) if err := o.ApplySet.Prune(ctx, o); err != nil {
if err != nil {
return err
}
if err := pruner.pruneAll(ctx, o.ApplySet); err != nil {
// Do not update the ApplySet. If pruning failed, we want to keep the superset // Do not update the ApplySet. If pruning failed, we want to keep the superset
// of the previous and current resources in the ApplySet, so that the pruning // of the previous and current resources in the ApplySet, so that the pruning
// step of the next apply will be able to clean up the set correctly. // step of the next apply will be able to clean up the set correctly.
return err return err
} }
if err := o.ApplySet.UpdateParent(UpdateToLatestSet, o.DryRunStrategy, o.ValidationDirective); err != nil {
return fmt.Errorf("apply and prune succeeded, but ApplySet update failed: %w", err)
}
} else { } else {
p := newPruner(o) p := newPruner(o)
return p.pruneAll(o) return p.pruneAll(o)

View File

@ -2371,7 +2371,7 @@ func TestApplySetParentManagement(t *testing.T) {
return false, nil, nil return false, nil, nil
}) })
cmdutil.BehaviorOnFatal(func(s string, i int) { cmdutil.BehaviorOnFatal(func(s string, i int) {
if failDeletes && s == `error: pruning /v1, Kind=ReplicationController objects: deleting test/test-rc: an error on the server ("") has prevented the request from succeeding` { if failDeletes && s == `error: pruning ReplicationController test/test-rc: an error on the server ("") has prevented the request from succeeding` {
t.Logf("got expected error %q", s) t.Logf("got expected error %q", s)
} else { } else {
t.Fatalf("unexpected exit %d: %s", i, s) t.Fatalf("unexpected exit %d: %s", i, s)

View File

@ -132,12 +132,12 @@ func (p ApplySetParentRef) String() string {
} }
type ApplySetTooling struct { type ApplySetTooling struct {
name string Name string
version string Version string
} }
func (t ApplySetTooling) String() string { func (t ApplySetTooling) String() string {
return fmt.Sprintf("%s/%s", t.name, t.version) return fmt.Sprintf("%s/%s", t.Name, t.Version)
} }
// NewApplySet creates a new ApplySet object tracked by the given parent object. // NewApplySet creates a new ApplySet object tracked by the given parent object.
@ -226,7 +226,7 @@ func (a *ApplySet) LabelsForMember() map[string]string {
} }
// addLabels sets our tracking labels on each object; this should be called as part of loading the objects. // addLabels sets our tracking labels on each object; this should be called as part of loading the objects.
func (a *ApplySet) addLabels(objects []*resource.Info) error { func (a *ApplySet) AddLabels(objects ...*resource.Info) error {
applysetLabels := a.LabelsForMember() applysetLabels := a.LabelsForMember()
for _, obj := range objects { for _, obj := range objects {
accessor, err := meta.Accessor(obj.Object) accessor, err := meta.Accessor(obj.Object)
@ -249,7 +249,7 @@ func (a *ApplySet) addLabels(objects []*resource.Info) error {
return nil return nil
} }
func (a *ApplySet) FetchParent() error { func (a *ApplySet) fetchParent() error {
helper := resource.NewHelper(a.client, a.parentRef.RESTMapping) helper := resource.NewHelper(a.client, a.parentRef.RESTMapping)
obj, err := helper.Get(a.parentRef.Namespace, a.parentRef.Name) obj, err := helper.Get(a.parentRef.Namespace, a.parentRef.Name)
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
@ -272,8 +272,8 @@ func (a *ApplySet) FetchParent() error {
if !hasToolAnno { if !hasToolAnno {
return fmt.Errorf("ApplySet parent object %q already exists and is missing required annotation %q", a.parentRef, ApplySetToolingAnnotation) return fmt.Errorf("ApplySet parent object %q already exists and is missing required annotation %q", a.parentRef, ApplySetToolingAnnotation)
} }
if managedBy := toolingBaseName(toolAnnotation); managedBy != a.toolingID.name { if managedBy := toolingBaseName(toolAnnotation); managedBy != a.toolingID.Name {
return fmt.Errorf("ApplySet parent object %q already exists and is managed by tooling %q instead of %q", a.parentRef, managedBy, a.toolingID.name) return fmt.Errorf("ApplySet parent object %q already exists and is managed by tooling %q instead of %q", a.parentRef, managedBy, a.toolingID.Name)
} }
idLabel, hasIDLabel := labels[ApplySetParentIDLabel] idLabel, hasIDLabel := labels[ApplySetParentIDLabel]
@ -375,9 +375,9 @@ func parseNamespacesAnnotation(annotations map[string]string) sets.Set[string] {
return sets.New(strings.Split(annotation, ",")...) return sets.New(strings.Split(annotation, ",")...)
} }
// AddResource registers the given resource and namespace as being part of the updated set of // addResource registers the given resource and namespace as being part of the updated set of
// resources being applied by the current operation. // resources being applied by the current operation.
func (a *ApplySet) AddResource(resource *meta.RESTMapping, namespace string) { func (a *ApplySet) addResource(resource *meta.RESTMapping, namespace string) {
a.updatedResources[resource.Resource] = resource a.updatedResources[resource.Resource] = resource
if resource.Scope == meta.RESTScopeNamespace && namespace != "" { if resource.Scope == meta.RESTScopeNamespace && namespace != "" {
a.updatedNamespaces.Insert(namespace) a.updatedNamespaces.Insert(namespace)
@ -386,10 +386,10 @@ func (a *ApplySet) AddResource(resource *meta.RESTMapping, namespace string) {
type ApplySetUpdateMode string type ApplySetUpdateMode string
var UpdateToLatestSet ApplySetUpdateMode = "latest" var updateToLatestSet ApplySetUpdateMode = "latest"
var UpdateToSuperset ApplySetUpdateMode = "superset" var updateToSuperset ApplySetUpdateMode = "superset"
func (a *ApplySet) UpdateParent(mode ApplySetUpdateMode, dryRun cmdutil.DryRunStrategy, validation string) error { func (a *ApplySet) updateParent(mode ApplySetUpdateMode, dryRun cmdutil.DryRunStrategy, validation string) error {
data, err := json.Marshal(a.buildParentPatch(mode)) data, err := json.Marshal(a.buildParentPatch(mode))
if err != nil { if err != nil {
return fmt.Errorf("failed to encode patch for ApplySet parent: %w", err) return fmt.Errorf("failed to encode patch for ApplySet parent: %w", err)
@ -431,14 +431,14 @@ func serverSideApplyRequest(a *ApplySet, data []byte, dryRun cmdutil.DryRunStrat
func (a *ApplySet) buildParentPatch(mode ApplySetUpdateMode) *metav1.PartialObjectMetadata { func (a *ApplySet) buildParentPatch(mode ApplySetUpdateMode) *metav1.PartialObjectMetadata {
var newGRsAnnotation, newNsAnnotation string var newGRsAnnotation, newNsAnnotation string
switch mode { switch mode {
case UpdateToSuperset: case updateToSuperset:
// If the apply succeeded but pruning failed, the set of group resources that // If the apply succeeded but pruning failed, the set of group resources that
// the ApplySet should track is the superset of the previous and current resources. // the ApplySet should track is the superset of the previous and current resources.
// This ensures that the resources that failed to be pruned are not orphaned from the set. // This ensures that the resources that failed to be pruned are not orphaned from the set.
grSuperset := sets.KeySet(a.currentResources).Union(sets.KeySet(a.updatedResources)) grSuperset := sets.KeySet(a.currentResources).Union(sets.KeySet(a.updatedResources))
newGRsAnnotation = generateResourcesAnnotation(grSuperset) newGRsAnnotation = generateResourcesAnnotation(grSuperset)
newNsAnnotation = generateNamespacesAnnotation(a.currentNamespaces.Union(a.updatedNamespaces), a.parentRef.Namespace) newNsAnnotation = generateNamespacesAnnotation(a.currentNamespaces.Union(a.updatedNamespaces), a.parentRef.Namespace)
case UpdateToLatestSet: case updateToLatestSet:
newGRsAnnotation = generateResourcesAnnotation(sets.KeySet(a.updatedResources)) newGRsAnnotation = generateResourcesAnnotation(sets.KeySet(a.updatedResources))
newNsAnnotation = generateNamespacesAnnotation(a.updatedNamespaces, a.parentRef.Namespace) newNsAnnotation = generateNamespacesAnnotation(a.updatedNamespaces, a.parentRef.Namespace)
} }
@ -479,7 +479,7 @@ func generateResourcesAnnotation(resources sets.Set[schema.GroupVersionResource]
} }
func (a ApplySet) FieldManager() string { func (a ApplySet) FieldManager() string {
return fmt.Sprintf("%s-applyset", a.toolingID.name) return fmt.Sprintf("%s-applyset", a.toolingID.Name)
} }
// ParseApplySetParentRef creates a new ApplySetParentRef from a parent reference in the format [RESOURCE][.GROUP]/NAME // ParseApplySetParentRef creates a new ApplySetParentRef from a parent reference in the format [RESOURCE][.GROUP]/NAME
@ -509,3 +509,52 @@ func ParseApplySetParentRef(parentRefStr string, mapper meta.RESTMapper) (*Apply
} }
return &ApplySetParentRef{Name: name, RESTMapping: mapping}, nil return &ApplySetParentRef{Name: name, RESTMapping: mapping}, nil
} }
// Prune deletes any objects from the apiserver that are no longer in the applyset.
func (a *ApplySet) Prune(ctx context.Context, o *ApplyOptions) error {
printer, err := o.ToPrinter("pruned")
if err != nil {
return err
}
opt := &ApplySetDeleteOptions{
CascadingStrategy: o.DeleteOptions.CascadingStrategy,
DryRunStrategy: o.DryRunStrategy,
GracePeriod: o.DeleteOptions.GracePeriod,
Printer: printer,
IOStreams: o.IOStreams,
}
if err := a.pruneAll(ctx, o.DynamicClient, o.VisitedUids, opt); err != nil {
return err
}
if err := a.updateParent(updateToLatestSet, o.DryRunStrategy, o.ValidationDirective); err != nil {
return fmt.Errorf("apply and prune succeeded, but ApplySet update failed: %w", err)
}
return nil
}
// BeforeApply should be called before applying the objects.
// It pre-updates the parent object so that it covers the resources that will be applied.
// In this way, even if we are interrupted, we will not leak objects.
func (a *ApplySet) BeforeApply(objects []*resource.Info, dryRunStrategy cmdutil.DryRunStrategy, validationDirective string) error {
if err := a.fetchParent(); err != nil {
return err
}
// Update the live parent object to the superset of the current and previous resources.
// Doing this before the actual apply and prune operations improves behavior by ensuring
// the live object contains the superset on failure. This may cause the next pruning
// operation to make a larger number of GET requests than strictly necessary, but it prevents
// object leakage from the set. The superset will automatically be reduced to the correct
// set by the next successful operation.
for _, info := range objects {
a.addResource(info.ResourceMapping(), info.Namespace)
}
if err := a.updateParent(updateToSuperset, dryRunStrategy, validationDirective); err != nil {
return err
}
return nil
}

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions"
@ -31,101 +32,129 @@ import (
cmdutil "k8s.io/kubectl/pkg/cmd/util" cmdutil "k8s.io/kubectl/pkg/cmd/util"
) )
type applySetPruner struct { type ApplySetDeleteOptions struct {
dynamicClient dynamic.Interface CascadingStrategy metav1.DeletionPropagation
DryRunStrategy cmdutil.DryRunStrategy
GracePeriod int
visitedUids sets.Set[types.UID] Printer printers.ResourcePrinter
cascadingStrategy metav1.DeletionPropagation IOStreams genericclioptions.IOStreams
dryRunStrategy cmdutil.DryRunStrategy
gracePeriod int
printer printers.ResourcePrinter
ioStreams genericclioptions.IOStreams
} }
func newApplySetPruner(o *ApplyOptions) (*applySetPruner, error) { // PruneObject is an apiserver object that should be deleted as part of prune.
printer, err := o.ToPrinter("pruned") type PruneObject struct {
if err != nil { Name string
return nil, err Namespace string
Mapping *meta.RESTMapping
Object runtime.Object
} }
return &applySetPruner{ // String returns a human-readable name of the object, for use in debug messages.
dynamicClient: o.DynamicClient, func (p *PruneObject) String() string {
s := p.Mapping.GroupVersionKind.GroupKind().String()
visitedUids: o.VisitedUids, if p.Namespace != "" {
s += " " + p.Namespace + "/" + p.Name
cascadingStrategy: o.DeleteOptions.CascadingStrategy, } else {
dryRunStrategy: o.DryRunStrategy, s += " " + p.Name
gracePeriod: o.DeleteOptions.GracePeriod, }
return s
printer: printer,
ioStreams: o.IOStreams,
}, nil
} }
func (p *applySetPruner) pruneAll(ctx context.Context, applyset *ApplySet) error { // FindAllObjectsToPrune returns the list of objects that will be pruned.
// TODO: Split into discovery and deletion, run discovery in parallel (and maybe in consistent order or in parallel?) // Calling this instead of Prune can be useful for dry-run / diff behaviour.
for _, restMapping := range applyset.AllPrunableResources() { func (a *ApplySet) FindAllObjectsToPrune(ctx context.Context, dynamicClient dynamic.Interface, visitedUids sets.Set[types.UID]) ([]PruneObject, error) {
var allObjects []PruneObject
// TODO: Run discovery in parallel (and maybe in consistent order?)
for _, restMapping := range a.AllPrunableResources() {
switch restMapping.Scope.Name() { switch restMapping.Scope.Name() {
case meta.RESTScopeNameNamespace: case meta.RESTScopeNameNamespace:
for _, namespace := range applyset.AllPrunableNamespaces() { for _, namespace := range a.AllPrunableNamespaces() {
if namespace == "" { if namespace == "" {
// Just double-check because otherwise we get cryptic error messages // Just double-check because otherwise we get cryptic error messages
return fmt.Errorf("unexpectedly encountered empty namespace during prune of namespace-scoped resource %v", restMapping.GroupVersionKind) return nil, fmt.Errorf("unexpectedly encountered empty namespace during prune of namespace-scoped resource %v", restMapping.GroupVersionKind)
} }
if err := p.prune(ctx, namespace, restMapping, applyset); err != nil { pruneObjects, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUids, namespace, restMapping)
return fmt.Errorf("pruning %v objects: %w", restMapping.GroupVersionKind.String(), err) if err != nil {
return nil, fmt.Errorf("listing %v objects for prune: %w", restMapping.GroupVersionKind.String(), err)
} }
allObjects = append(allObjects, pruneObjects...)
} }
case meta.RESTScopeNameRoot: case meta.RESTScopeNameRoot:
if err := p.prune(ctx, metav1.NamespaceNone, restMapping, applyset); err != nil { pruneObjects, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUids, metav1.NamespaceNone, restMapping)
return fmt.Errorf("pruning %v objects: %w", restMapping.GroupVersionKind.String(), err) if err != nil {
return nil, fmt.Errorf("listing %v objects for prune: %w", restMapping.GroupVersionKind.String(), err)
} }
allObjects = append(allObjects, pruneObjects...)
default: default:
return fmt.Errorf("unhandled scope %q", restMapping.Scope.Name()) return nil, fmt.Errorf("unhandled scope %q", restMapping.Scope.Name())
} }
} }
return nil return allObjects, nil
} }
func (p *applySetPruner) prune(ctx context.Context, namespace string, mapping *meta.RESTMapping, applyset *ApplySet) error { func (a *ApplySet) pruneAll(ctx context.Context, dynamicClient dynamic.Interface, visitedUids sets.Set[types.UID], deleteOptions *ApplySetDeleteOptions) error {
applysetLabelSelector := applyset.LabelSelectorForMembers() allObjects, err := a.FindAllObjectsToPrune(ctx, dynamicClient, visitedUids)
if err != nil {
return err
}
return a.deleteObjects(ctx, dynamicClient, allObjects, deleteOptions)
}
func (a *ApplySet) findObjectsToPrune(ctx context.Context, dynamicClient dynamic.Interface, visitedUids sets.Set[types.UID], namespace string, mapping *meta.RESTMapping) ([]PruneObject, error) {
applysetLabelSelector := a.LabelSelectorForMembers()
opt := metav1.ListOptions{ opt := metav1.ListOptions{
LabelSelector: applysetLabelSelector, LabelSelector: applysetLabelSelector,
} }
klog.V(2).Infof("listing objects for pruning; namespace=%q, resource=%v", namespace, mapping.Resource) klog.V(2).Infof("listing objects for pruning; namespace=%q, resource=%v", namespace, mapping.Resource)
objects, err := p.dynamicClient.Resource(mapping.Resource).Namespace(namespace).List(ctx, opt) objects, err := dynamicClient.Resource(mapping.Resource).Namespace(namespace).List(ctx, opt)
if err != nil { if err != nil {
return err return nil, err
} }
var pruneObjects []PruneObject
for i := range objects.Items { for i := range objects.Items {
obj := &objects.Items[i] obj := &objects.Items[i]
uid := obj.GetUID() uid := obj.GetUID()
if p.visitedUids.Has(uid) { if visitedUids.Has(uid) {
continue continue
} }
name := obj.GetName() name := obj.GetName()
if p.dryRunStrategy != cmdutil.DryRunClient { pruneObjects = append(pruneObjects, PruneObject{
if err := p.delete(ctx, namespace, name, mapping); err != nil { Name: name,
return fmt.Errorf("deleting %s/%s: %w", namespace, name, err) Namespace: namespace,
Mapping: mapping,
Object: obj,
})
}
return pruneObjects, nil
}
func (a *ApplySet) deleteObjects(ctx context.Context, dynamicClient dynamic.Interface, pruneObjects []PruneObject, opt *ApplySetDeleteOptions) error {
for i := range pruneObjects {
pruneObject := &pruneObjects[i]
name := pruneObject.Name
namespace := pruneObject.Namespace
mapping := pruneObject.Mapping
if opt.DryRunStrategy != cmdutil.DryRunClient {
if err := runDelete(ctx, namespace, name, mapping, dynamicClient, opt.CascadingStrategy, opt.GracePeriod, opt.DryRunStrategy == cmdutil.DryRunServer); err != nil {
return fmt.Errorf("pruning %v: %w", pruneObject.String(), err)
} }
} }
p.printer.PrintObj(obj, p.ioStreams.Out) opt.Printer.PrintObj(pruneObject.Object, opt.IOStreams.Out)
} }
return nil return nil
} }
func (p *applySetPruner) delete(ctx context.Context, namespace string, name string, mapping *meta.RESTMapping) error {
return runDelete(ctx, namespace, name, mapping, p.dynamicClient, p.cascadingStrategy, p.gracePeriod, p.dryRunStrategy == cmdutil.DryRunServer)
}

View File

@ -1,2 +1,2 @@
namespace/foo unchanged namespace/foo unchanged
error: pruning /v1, Kind=Namespace objects: deleting /bar: an error on the server ("") has prevented the request from succeeding error: pruning Namespace bar: an error on the server ("") has prevented the request from succeeding