Merge pull request #86361 from seans3/apply-refactor

kubectl apply refactor
This commit is contained in:
Kubernetes Prow Robot 2019-12-19 05:51:56 -08:00 committed by GitHub
commit 8c2f2c93bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 706 additions and 512 deletions

View File

@ -7,6 +7,8 @@ go_library(
"apply_edit_last_applied.go",
"apply_set_last_applied.go",
"apply_view_last_applied.go",
"patcher.go",
"prune.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/kubectl/pkg/cmd/apply",
importpath = "k8s.io/kubectl/pkg/cmd/apply",

View File

@ -17,14 +17,9 @@ limitations under the License.
package apply
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/jonboulle/clockwork"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
@ -34,18 +29,13 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/mergepatch"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/printers"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/klog"
oapi "k8s.io/kube-openapi/pkg/util/proto"
"k8s.io/kubectl/pkg/cmd/delete"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"
@ -92,16 +82,30 @@ type ApplyOptions struct {
EnforceNamespace bool
genericclioptions.IOStreams
}
const (
// maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure
maxPatchRetry = 5
// backOffPeriod is the period to back off when apply patch results in error.
backOffPeriod = 1 * time.Second
// how many times we can retry before back off
triesBeforeBackOff = 1
)
// Objects (and some denormalized data) which are to be
// applied. The standard way to fill in this structure
// is by calling "GetObjects()", which will use the
// resource builder if "objectsCached" is false. The other
// way to set this field is to use "SetObjects()".
// Subsequent calls to "GetObjects()" after setting would
// not call the resource builder; only return the set objects.
objects []*resource.Info
objectsCached bool
// Stores visited objects/namespaces for later use
// calculating the set of objects to prune.
VisitedUids sets.String
VisitedNamespaces sets.String
// Function run after the objects are generated and
// stored in the "objects" field, but before the
// apply is run on these objects.
PreProcessorFn func() error
// Function run after all objects have been applied.
// The standard PostProcessorFn is "PrintAndPrunePostProcessor()".
PostProcessorFn func() error
}
var (
applyLong = templates.LongDesc(i18n.T(`
@ -146,6 +150,12 @@ func NewApplyOptions(ioStreams genericclioptions.IOStreams) *ApplyOptions {
Recorder: genericclioptions.NoopRecorder{},
IOStreams: ioStreams,
objects: []*resource.Info{},
objectsCached: false,
VisitedUids: sets.NewString(),
VisitedNamespaces: sets.NewString(),
}
}
@ -269,6 +279,15 @@ func (o *ApplyOptions) Complete(f cmdutil.Factory, cmd *cobra.Command) error {
return err
}
if o.Prune {
o.PruneResources, err = parsePruneResources(o.Mapper, o.PruneWhitelist)
if err != nil {
return err
}
}
o.PostProcessorFn = o.PrintAndPrunePostProcessor()
return nil
}
@ -289,37 +308,6 @@ func validatePruneAll(prune, all bool, selector string) error {
return nil
}
func parsePruneResources(mapper meta.RESTMapper, gvks []string) ([]pruneResource, error) {
pruneResources := []pruneResource{}
for _, groupVersionKind := range gvks {
gvk := strings.Split(groupVersionKind, "/")
if len(gvk) != 3 {
return nil, fmt.Errorf("invalid GroupVersionKind format: %v, please follow <group/version/kind>", groupVersionKind)
}
if gvk[0] == "core" {
gvk[0] = ""
}
mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk[0], Kind: gvk[2]}, gvk[1])
if err != nil {
return pruneResources, err
}
var namespaced bool
namespaceScope := mapping.Scope.Name()
switch namespaceScope {
case meta.RESTScopeNameNamespace:
namespaced = true
case meta.RESTScopeNameRoot:
namespaced = false
default:
return pruneResources, fmt.Errorf("Unknown namespace scope: %q", namespaceScope)
}
pruneResources = append(pruneResources, pruneResource{gvk[0], gvk[1], gvk[2], namespaced})
}
return pruneResources, nil
}
func isIncompatibleServerError(err error) bool {
// 415: Unsupported media type means we're talking to a server which doesn't
// support server-side apply.
@ -330,54 +318,70 @@ func isIncompatibleServerError(err error) bool {
return err.(*errors.StatusError).Status().Code == http.StatusUnsupportedMediaType
}
// GetObjects returns a (possibly cached) version of all the objects to apply
// as a slice of pointer to resource.Info. The resource.Info contains the object
// and some other denormalized data. This function should not be called until
// AFTER the "complete" and "validate" methods have been called to ensure that
// the ApplyOptions is filled in and valid. Returns an error if the resource
// builder returns an error retrieving the objects.
func (o *ApplyOptions) GetObjects() ([]*resource.Info, error) {
if !o.objectsCached {
// include the uninitialized objects by default if --prune is true
// unless explicitly set --include-uninitialized=false
r := o.Builder.
Unstructured().
Schema(o.Validator).
ContinueOnError().
NamespaceParam(o.Namespace).DefaultNamespace().
FilenameParam(o.EnforceNamespace, &o.DeleteOptions.FilenameOptions).
LabelSelectorParam(o.Selector).
Flatten().
Do()
if err := r.Err(); err != nil {
return nil, err
}
infos, err := r.Infos()
if err != nil {
return nil, err
}
o.objects = infos
o.objectsCached = true
}
return o.objects, nil
}
// SetObjects stores the set of objects (as resource.Info) to be
// subsequently applied.
func (o *ApplyOptions) SetObjects(infos []*resource.Info) {
o.objects = infos
o.objectsCached = true
}
// Run executes the `apply` command.
func (o *ApplyOptions) Run() error {
var openapiSchema openapi.Resources
if o.OpenAPIPatch {
openapiSchema = o.OpenAPISchema
}
dryRunVerifier := &DryRunVerifier{
Finder: cmdutil.NewCRDFinder(cmdutil.CRDFromDynamic(o.DynamicClient)),
OpenAPIGetter: o.DiscoveryClient,
}
// include the uninitialized objects by default if --prune is true
// unless explicitly set --include-uninitialized=false
r := o.Builder.
Unstructured().
Schema(o.Validator).
ContinueOnError().
NamespaceParam(o.Namespace).DefaultNamespace().
FilenameParam(o.EnforceNamespace, &o.DeleteOptions.FilenameOptions).
LabelSelectorParam(o.Selector).
Flatten().
Do()
if err := r.Err(); err != nil {
if o.PreProcessorFn != nil {
klog.V(4).Infof("Running apply pre-processor function")
if err := o.PreProcessorFn(); err != nil {
return err
}
}
// Generates the objects using the resource builder if they have not
// already been stored by calling "SetObjects()" in the pre-processor.
infos, err := o.GetObjects()
if err != nil {
return err
}
var err error
if o.Prune {
o.PruneResources, err = parsePruneResources(o.Mapper, o.PruneWhitelist)
if err != nil {
return err
}
if len(infos) == 0 {
return fmt.Errorf("no objects passed to apply")
}
output := *o.PrintFlags.OutputFormat
shortOutput := output == "name"
visitedUids := sets.NewString()
visitedNamespaces := sets.NewString()
var objs []runtime.Object
count := 0
err = r.Visit(func(info *resource.Info, err error) error {
if err != nil {
return err
}
for _, info := range infos {
// If server-dry-run is requested but the type doesn't support it, fail right away.
if o.ServerDryRun {
@ -386,9 +390,7 @@ func (o *ApplyOptions) Run() error {
}
}
if info.Namespaced() {
visitedNamespaces.Insert(info.Namespace)
}
o.MarkNamespaceVisited(info)
if err := o.Recorder.Record(info.Object); err != nil {
klog.V(4).Infof("error recording current command: %v", err)
@ -440,16 +442,13 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err)
}
info.Refresh(obj, true)
metadata, err := meta.Accessor(info.Object)
if err != nil {
if err := o.MarkObjectVisited(info); err != nil {
return err
}
visitedUids.Insert(string(metadata.GetUID()))
count++
if len(output) > 0 && !shortOutput {
objs = append(objs, info.Object)
return nil
if o.shouldPrintObject() {
continue
}
printer, err := o.ToPrinter("serverside-applied")
@ -457,7 +456,10 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err)
return err
}
return printer.PrintObj(info.Object, o.Out)
if err = printer.PrintObj(info.Object, o.Out); err != nil {
return err
}
continue
}
// Get the modified configuration of the object. Embed the result
@ -468,9 +470,6 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err)
return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving modified configuration from:\n%s\nfor:", info.String()), info.Source, err)
}
// Print object only if output format other than "name" is specified
printObject := len(output) > 0 && !shortOutput
if err := info.Get(); err != nil {
if !errors.IsNotFound(err) {
return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving current configuration of:\n%s\nfrom server for:", info.String()), info.Source, err)
@ -495,54 +494,36 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err)
info.Refresh(obj, true)
}
metadata, err := meta.Accessor(info.Object)
if err != nil {
if err := o.MarkObjectVisited(info); err != nil {
return err
}
visitedUids.Insert(string(metadata.GetUID()))
count++
if printObject {
objs = append(objs, info.Object)
return nil
if o.shouldPrintObject() {
continue
}
printer, err := o.ToPrinter("created")
if err != nil {
return err
}
return printer.PrintObj(info.Object, o.Out)
if err = printer.PrintObj(info.Object, o.Out); err != nil {
return err
}
continue
}
metadata, err := meta.Accessor(info.Object)
if err != nil {
if err := o.MarkObjectVisited(info); err != nil {
return err
}
visitedUids.Insert(string(metadata.GetUID()))
if !o.DryRun {
metadata, _ := meta.Accessor(info.Object)
annotationMap := metadata.GetAnnotations()
if _, ok := annotationMap[corev1.LastAppliedConfigAnnotation]; !ok {
fmt.Fprintf(o.ErrOut, warningNoLastAppliedConfigAnnotation, o.cmdBaseName)
}
helper := resource.NewHelper(info.Client, info.Mapping)
patcher := &Patcher{
Mapping: info.Mapping,
Helper: helper,
DynamicClient: o.DynamicClient,
Overwrite: o.Overwrite,
BackOff: clockwork.NewRealClock(),
Force: o.DeleteOptions.ForceDeletion,
Cascade: o.DeleteOptions.Cascade,
Timeout: o.DeleteOptions.Timeout,
GracePeriod: o.DeleteOptions.GracePeriod,
ServerDryRun: o.ServerDryRun,
OpenapiSchema: openapiSchema,
Retries: maxPatchRetry,
}
patcher := newPatcher(o, info)
patchBytes, patchedObject, err := patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut)
if err != nil {
return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patchBytes, info), info.Source, err)
@ -550,46 +531,75 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err)
info.Refresh(patchedObject, true)
if string(patchBytes) == "{}" && !printObject {
count++
if string(patchBytes) == "{}" && !o.shouldPrintObject() {
printer, err := o.ToPrinter("unchanged")
if err != nil {
return err
}
return printer.PrintObj(info.Object, o.Out)
if err = printer.PrintObj(info.Object, o.Out); err != nil {
return err
}
continue
}
}
count++
if printObject {
objs = append(objs, info.Object)
return nil
if o.shouldPrintObject() {
continue
}
printer, err := o.ToPrinter("configured")
if err != nil {
return err
}
return printer.PrintObj(info.Object, o.Out)
})
if err = printer.PrintObj(info.Object, o.Out); err != nil {
return err
}
}
if o.PostProcessorFn != nil {
klog.V(4).Infof("Running apply post-processor function")
if err := o.PostProcessorFn(); err != nil {
return err
}
}
return nil
}
func (o *ApplyOptions) shouldPrintObject() bool {
// Print object only if output format other than "name" is specified
shouldPrint := false
output := *o.PrintFlags.OutputFormat
shortOutput := output == "name"
if len(output) > 0 && !shortOutput {
shouldPrint = true
}
return shouldPrint
}
func (o *ApplyOptions) printObjects() error {
if !o.shouldPrintObject() {
return nil
}
infos, err := o.GetObjects()
if err != nil {
return err
}
if count == 0 {
return fmt.Errorf("no objects passed to apply")
}
// print objects
if len(objs) > 0 {
if len(infos) > 0 {
printer, err := o.ToPrinter("")
if err != nil {
return err
}
objToPrint := objs[0]
if len(objs) > 1 {
objToPrint := infos[0].Object
if len(infos) > 1 {
objs := []runtime.Object{}
for _, info := range infos {
objs = append(objs, info.Object)
}
list := &corev1.List{
TypeMeta: metav1.TypeMeta{
Kind: "List",
@ -608,211 +618,47 @@ See http://k8s.io/docs/reference/using-api/api-concepts/#conflicts`, err)
}
}
if !o.Prune {
return nil
}
// MarkNamespaceVisited keeps track of which namespaces the applied
// objects belong to. Used for pruning.
func (o *ApplyOptions) MarkNamespaceVisited(info *resource.Info) {
if info.Namespaced() {
o.VisitedNamespaces.Insert(info.Namespace)
}
}
// MarkNamespaceVisited keeps track of UIDs of the applied
// objects. Used for pruning.
func (o *ApplyOptions) MarkObjectVisited(info *resource.Info) error {
metadata, err := meta.Accessor(info.Object)
if err != nil {
return err
}
o.VisitedUids.Insert(string(metadata.GetUID()))
return nil
}
// PrintAndPrune returns a function which meets the PostProcessorFn
// function signature. This returned function prints all the
// objects as a list (if configured for that), and prunes the
// objects not applied. The returned function is the standard
// apply post processor.
func (o *ApplyOptions) PrintAndPrunePostProcessor() func() error {
return func() error {
if err := o.printObjects(); err != nil {
return err
}
if o.Prune {
p := newPruner(o)
return p.pruneAll(o)
}
return nil
}
p := pruner{
mapper: o.Mapper,
dynamicClient: o.DynamicClient,
labelSelector: o.Selector,
visitedUids: visitedUids,
cascade: o.DeleteOptions.Cascade,
dryRun: o.DryRun,
serverDryRun: o.ServerDryRun,
gracePeriod: o.DeleteOptions.GracePeriod,
toPrinter: o.ToPrinter,
out: o.Out,
}
namespacedRESTMappings, nonNamespacedRESTMappings, err := getRESTMappings(o.Mapper, &(o.PruneResources))
if err != nil {
return fmt.Errorf("error retrieving RESTMappings to prune: %v", err)
}
for n := range visitedNamespaces {
if len(o.Namespace) != 0 && n != o.Namespace {
continue
}
for _, m := range namespacedRESTMappings {
if err := p.prune(n, m); err != nil {
return fmt.Errorf("error pruning namespaced object %v: %v", m.GroupVersionKind, err)
}
}
}
for _, m := range nonNamespacedRESTMappings {
if err := p.prune(metav1.NamespaceNone, m); err != nil {
return fmt.Errorf("error pruning nonNamespaced object %v: %v", m.GroupVersionKind, err)
}
}
return nil
}
type pruneResource struct {
group string
version string
kind string
namespaced bool
}
func (pr pruneResource) String() string {
return fmt.Sprintf("%v/%v, Kind=%v, Namespaced=%v", pr.group, pr.version, pr.kind, pr.namespaced)
}
func getRESTMappings(mapper meta.RESTMapper, pruneResources *[]pruneResource) (namespaced, nonNamespaced []*meta.RESTMapping, err error) {
if len(*pruneResources) == 0 {
// default whitelist
// TODO: need to handle the older api versions - e.g. v1beta1 jobs. Github issue: #35991
*pruneResources = []pruneResource{
{"", "v1", "ConfigMap", true},
{"", "v1", "Endpoints", true},
{"", "v1", "Namespace", false},
{"", "v1", "PersistentVolumeClaim", true},
{"", "v1", "PersistentVolume", false},
{"", "v1", "Pod", true},
{"", "v1", "ReplicationController", true},
{"", "v1", "Secret", true},
{"", "v1", "Service", true},
{"batch", "v1", "Job", true},
{"batch", "v1beta1", "CronJob", true},
{"extensions", "v1beta1", "Ingress", true},
{"apps", "v1", "DaemonSet", true},
{"apps", "v1", "Deployment", true},
{"apps", "v1", "ReplicaSet", true},
{"apps", "v1", "StatefulSet", true},
}
}
for _, resource := range *pruneResources {
addedMapping, err := mapper.RESTMapping(schema.GroupKind{Group: resource.group, Kind: resource.kind}, resource.version)
if err != nil {
return nil, nil, fmt.Errorf("invalid resource %v: %v", resource, err)
}
if resource.namespaced {
namespaced = append(namespaced, addedMapping)
} else {
nonNamespaced = append(nonNamespaced, addedMapping)
}
}
return namespaced, nonNamespaced, nil
}
type pruner struct {
mapper meta.RESTMapper
dynamicClient dynamic.Interface
visitedUids sets.String
labelSelector string
fieldSelector string
cascade bool
serverDryRun bool
dryRun bool
gracePeriod int
toPrinter func(string) (printers.ResourcePrinter, error)
out io.Writer
}
func (p *pruner) prune(namespace string, mapping *meta.RESTMapping) error {
objList, err := p.dynamicClient.Resource(mapping.Resource).
Namespace(namespace).
List(metav1.ListOptions{
LabelSelector: p.labelSelector,
FieldSelector: p.fieldSelector,
})
if err != nil {
return err
}
objs, err := meta.ExtractList(objList)
if err != nil {
return err
}
for _, obj := range objs {
metadata, err := meta.Accessor(obj)
if err != nil {
return err
}
annots := metadata.GetAnnotations()
if _, ok := annots[corev1.LastAppliedConfigAnnotation]; !ok {
// don't prune resources not created with apply
continue
}
uid := metadata.GetUID()
if p.visitedUids.Has(string(uid)) {
continue
}
name := metadata.GetName()
if !p.dryRun {
if err := p.delete(namespace, name, mapping); err != nil {
return err
}
}
printer, err := p.toPrinter("pruned")
if err != nil {
return err
}
printer.PrintObj(obj, p.out)
}
return nil
}
func (p *pruner) delete(namespace, name string, mapping *meta.RESTMapping) error {
return runDelete(namespace, name, mapping, p.dynamicClient, p.cascade, p.gracePeriod, p.serverDryRun)
}
func runDelete(namespace, name string, mapping *meta.RESTMapping, c dynamic.Interface, cascade bool, gracePeriod int, serverDryRun bool) error {
options := &metav1.DeleteOptions{}
if gracePeriod >= 0 {
options = metav1.NewDeleteOptions(int64(gracePeriod))
}
if serverDryRun {
options.DryRun = []string{metav1.DryRunAll}
}
policy := metav1.DeletePropagationForeground
if !cascade {
policy = metav1.DeletePropagationOrphan
}
options.PropagationPolicy = &policy
return c.Resource(mapping.Resource).Namespace(namespace).Delete(name, options)
}
func (p *Patcher) delete(namespace, name string) error {
return runDelete(namespace, name, p.Mapping, p.DynamicClient, p.Cascade, p.GracePeriod, p.ServerDryRun)
}
// Patcher defines options to patch OpenAPI objects.
type Patcher struct {
Mapping *meta.RESTMapping
Helper *resource.Helper
DynamicClient dynamic.Interface
Overwrite bool
BackOff clockwork.Clock
Force bool
Cascade bool
Timeout time.Duration
GracePeriod int
ServerDryRun bool
// If set, forces the patch against a specific resourceVersion
ResourceVersion *string
// Number of retries to make if the patch fails with conflict
Retries int
OpenapiSchema openapi.Resources
}
// DryRunVerifier verifies if a given group-version-kind supports DryRun
@ -853,165 +699,3 @@ func (v *DryRunVerifier) HasSupport(gvk schema.GroupVersionKind) error {
}
return nil
}
func addResourceVersion(patch []byte, rv string) ([]byte, error) {
var patchMap map[string]interface{}
err := json.Unmarshal(patch, &patchMap)
if err != nil {
return nil, err
}
u := unstructured.Unstructured{Object: patchMap}
a, err := meta.Accessor(&u)
if err != nil {
return nil, err
}
a.SetResourceVersion(rv)
return json.Marshal(patchMap)
}
func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
// Serialize the current configuration of the object from the server.
current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", obj), source, err)
}
// Retrieve the original configuration of the object from the annotation.
original, err := util.GetOriginalConfiguration(obj)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", obj), source, err)
}
var patchType types.PatchType
var patch []byte
var lookupPatchMeta strategicpatch.LookupPatchMeta
var schema oapi.Schema
createPatchErrFormat := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:"
// Create the versioned struct from the type defined in the restmapping
// (which is the API version we'll be submitting the patch to)
versionedObject, err := scheme.Scheme.New(p.Mapping.GroupVersionKind)
switch {
case runtime.IsNotRegisteredError(err):
// fall back to generic JSON merge patch
patchType = types.MergePatchType
preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"),
mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")}
patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...)
if err != nil {
if mergepatch.IsPreconditionFailed(err) {
return nil, nil, fmt.Errorf("%s", "At least one of apiVersion, kind and name was changed")
}
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err)
}
case err != nil:
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("getting instance of versioned object for %v:", p.Mapping.GroupVersionKind), source, err)
case err == nil:
// Compute a three way strategic merge patch to send to server.
patchType = types.StrategicMergePatchType
// Try to use openapi first if the openapi spec is available and can successfully calculate the patch.
// Otherwise, fall back to baked-in types.
if p.OpenapiSchema != nil {
if schema = p.OpenapiSchema.LookupResource(p.Mapping.GroupVersionKind); schema != nil {
lookupPatchMeta = strategicpatch.PatchMetaFromOpenAPI{Schema: schema}
if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite); err != nil {
fmt.Fprintf(errOut, "warning: error calculating patch from openapi spec: %v\n", err)
} else {
patchType = types.StrategicMergePatchType
patch = openapiPatch
}
}
}
if patch == nil {
lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err)
}
patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err)
}
}
}
if string(patch) == "{}" {
return patch, obj, nil
}
if p.ResourceVersion != nil {
patch, err = addResourceVersion(patch, *p.ResourceVersion)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr("Failed to insert resourceVersion in patch", source, err)
}
}
options := metav1.PatchOptions{}
if p.ServerDryRun {
options.DryRun = []string{metav1.DryRunAll}
}
patchedObj, err := p.Helper.Patch(namespace, name, patchType, patch, &options)
return patch, patchedObj, err
}
// Patch tries to patch an OpenAPI resource. On success, returns the merge patch as well
// the final patched object. On failure, returns an error.
func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
var getErr error
patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut)
if p.Retries == 0 {
p.Retries = maxPatchRetry
}
for i := 1; i <= p.Retries && errors.IsConflict(err); i++ {
if i > triesBeforeBackOff {
p.BackOff.Sleep(backOffPeriod)
}
current, getErr = p.Helper.Get(namespace, name, false)
if getErr != nil {
return nil, nil, getErr
}
patchBytes, patchObject, err = p.patchSimple(current, modified, source, namespace, name, errOut)
}
if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && p.Force {
patchBytes, patchObject, err = p.deleteAndCreate(current, modified, namespace, name)
}
return patchBytes, patchObject, err
}
func (p *Patcher) deleteAndCreate(original runtime.Object, modified []byte, namespace, name string) ([]byte, runtime.Object, error) {
if err := p.delete(namespace, name); err != nil {
return modified, nil, err
}
// TODO: use wait
if err := wait.PollImmediate(1*time.Second, p.Timeout, func() (bool, error) {
if _, err := p.Helper.Get(namespace, name, false); !errors.IsNotFound(err) {
return false, err
}
return true, nil
}); err != nil {
return modified, nil, err
}
versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil)
if err != nil {
return modified, nil, err
}
options := metav1.CreateOptions{}
if p.ServerDryRun {
options.DryRun = []string{metav1.DryRunAll}
}
createdObject, err := p.Helper.Create(namespace, true, versionedObject, &options)
if err != nil {
// restore the original object if we fail to create the new one
// but still propagate and advertise error to user
recreated, recreateErr := p.Helper.Create(namespace, true, original, &options)
if recreateErr != nil {
err = fmt.Errorf("An error occurred force-replacing the existing object with the newly provided one:\n\n%v.\n\nAdditionally, an error occurred attempting to restore the original object:\n\n%v", err, recreateErr)
} else {
createdObject = recreated
}
}
return modified, createdObject, err
}

View File

@ -0,0 +1,265 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apply
import (
"encoding/json"
"fmt"
"io"
"time"
"github.com/jonboulle/clockwork"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/mergepatch"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/dynamic"
oapi "k8s.io/kube-openapi/pkg/util/proto"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"
"k8s.io/kubectl/pkg/util"
"k8s.io/kubectl/pkg/util/openapi"
)
const (
// maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure
maxPatchRetry = 5
// backOffPeriod is the period to back off when apply patch results in error.
backOffPeriod = 1 * time.Second
// how many times we can retry before back off
triesBeforeBackOff = 1
)
// Patcher defines options to patch OpenAPI objects.
type Patcher struct {
Mapping *meta.RESTMapping
Helper *resource.Helper
DynamicClient dynamic.Interface
Overwrite bool
BackOff clockwork.Clock
Force bool
Cascade bool
Timeout time.Duration
GracePeriod int
ServerDryRun bool
// If set, forces the patch against a specific resourceVersion
ResourceVersion *string
// Number of retries to make if the patch fails with conflict
Retries int
OpenapiSchema openapi.Resources
}
func newPatcher(o *ApplyOptions, info *resource.Info) *Patcher {
var openapiSchema openapi.Resources
if o.OpenAPIPatch {
openapiSchema = o.OpenAPISchema
}
helper := resource.NewHelper(info.Client, info.Mapping)
return &Patcher{
Mapping: info.Mapping,
Helper: helper,
DynamicClient: o.DynamicClient,
Overwrite: o.Overwrite,
BackOff: clockwork.NewRealClock(),
Force: o.DeleteOptions.ForceDeletion,
Cascade: o.DeleteOptions.Cascade,
Timeout: o.DeleteOptions.Timeout,
GracePeriod: o.DeleteOptions.GracePeriod,
ServerDryRun: o.ServerDryRun,
OpenapiSchema: openapiSchema,
Retries: maxPatchRetry,
}
}
func (p *Patcher) delete(namespace, name string) error {
return runDelete(namespace, name, p.Mapping, p.DynamicClient, p.Cascade, p.GracePeriod, p.ServerDryRun)
}
func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
// Serialize the current configuration of the object from the server.
current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", obj), source, err)
}
// Retrieve the original configuration of the object from the annotation.
original, err := util.GetOriginalConfiguration(obj)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", obj), source, err)
}
var patchType types.PatchType
var patch []byte
var lookupPatchMeta strategicpatch.LookupPatchMeta
var schema oapi.Schema
createPatchErrFormat := "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:"
// Create the versioned struct from the type defined in the restmapping
// (which is the API version we'll be submitting the patch to)
versionedObject, err := scheme.Scheme.New(p.Mapping.GroupVersionKind)
switch {
case runtime.IsNotRegisteredError(err):
// fall back to generic JSON merge patch
patchType = types.MergePatchType
preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"),
mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")}
patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...)
if err != nil {
if mergepatch.IsPreconditionFailed(err) {
return nil, nil, fmt.Errorf("%s", "At least one of apiVersion, kind and name was changed")
}
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err)
}
case err != nil:
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("getting instance of versioned object for %v:", p.Mapping.GroupVersionKind), source, err)
case err == nil:
// Compute a three way strategic merge patch to send to server.
patchType = types.StrategicMergePatchType
// Try to use openapi first if the openapi spec is available and can successfully calculate the patch.
// Otherwise, fall back to baked-in types.
if p.OpenapiSchema != nil {
if schema = p.OpenapiSchema.LookupResource(p.Mapping.GroupVersionKind); schema != nil {
lookupPatchMeta = strategicpatch.PatchMetaFromOpenAPI{Schema: schema}
if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite); err != nil {
fmt.Fprintf(errOut, "warning: error calculating patch from openapi spec: %v\n", err)
} else {
patchType = types.StrategicMergePatchType
patch = openapiPatch
}
}
}
if patch == nil {
lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err)
}
patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err)
}
}
}
if string(patch) == "{}" {
return patch, obj, nil
}
if p.ResourceVersion != nil {
patch, err = addResourceVersion(patch, *p.ResourceVersion)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr("Failed to insert resourceVersion in patch", source, err)
}
}
options := metav1.PatchOptions{}
if p.ServerDryRun {
options.DryRun = []string{metav1.DryRunAll}
}
patchedObj, err := p.Helper.Patch(namespace, name, patchType, patch, &options)
return patch, patchedObj, err
}
// Patch tries to patch an OpenAPI resource. On success, returns the merge patch as well
// the final patched object. On failure, returns an error.
func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
var getErr error
patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut)
if p.Retries == 0 {
p.Retries = maxPatchRetry
}
for i := 1; i <= p.Retries && errors.IsConflict(err); i++ {
if i > triesBeforeBackOff {
p.BackOff.Sleep(backOffPeriod)
}
current, getErr = p.Helper.Get(namespace, name, false)
if getErr != nil {
return nil, nil, getErr
}
patchBytes, patchObject, err = p.patchSimple(current, modified, source, namespace, name, errOut)
}
if err != nil && (errors.IsConflict(err) || errors.IsInvalid(err)) && p.Force {
patchBytes, patchObject, err = p.deleteAndCreate(current, modified, namespace, name)
}
return patchBytes, patchObject, err
}
func (p *Patcher) deleteAndCreate(original runtime.Object, modified []byte, namespace, name string) ([]byte, runtime.Object, error) {
if err := p.delete(namespace, name); err != nil {
return modified, nil, err
}
// TODO: use wait
if err := wait.PollImmediate(1*time.Second, p.Timeout, func() (bool, error) {
if _, err := p.Helper.Get(namespace, name, false); !errors.IsNotFound(err) {
return false, err
}
return true, nil
}); err != nil {
return modified, nil, err
}
versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil)
if err != nil {
return modified, nil, err
}
options := metav1.CreateOptions{}
if p.ServerDryRun {
options.DryRun = []string{metav1.DryRunAll}
}
createdObject, err := p.Helper.Create(namespace, true, versionedObject, &options)
if err != nil {
// restore the original object if we fail to create the new one
// but still propagate and advertise error to user
recreated, recreateErr := p.Helper.Create(namespace, true, original, &options)
if recreateErr != nil {
err = fmt.Errorf("An error occurred force-replacing the existing object with the newly provided one:\n\n%v.\n\nAdditionally, an error occurred attempting to restore the original object:\n\n%v", err, recreateErr)
} else {
createdObject = recreated
}
}
return modified, createdObject, err
}
func addResourceVersion(patch []byte, rv string) ([]byte, error) {
var patchMap map[string]interface{}
err := json.Unmarshal(patch, &patchMap)
if err != nil {
return nil, err
}
u := unstructured.Unstructured{Object: patchMap}
a, err := meta.Accessor(&u)
if err != nil {
return nil, err
}
a.SetResourceVersion(rv)
return json.Marshal(patchMap)
}

View File

@ -0,0 +1,243 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apply
import (
"fmt"
"io"
"strings"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/cli-runtime/pkg/printers"
"k8s.io/client-go/dynamic"
)
type pruner struct {
mapper meta.RESTMapper
dynamicClient dynamic.Interface
visitedUids sets.String
visitedNamespaces sets.String
labelSelector string
fieldSelector string
cascade bool
serverDryRun bool
dryRun bool
gracePeriod int
toPrinter func(string) (printers.ResourcePrinter, error)
out io.Writer
}
func newPruner(o *ApplyOptions) pruner {
return pruner{
mapper: o.Mapper,
dynamicClient: o.DynamicClient,
labelSelector: o.Selector,
visitedUids: o.VisitedUids,
visitedNamespaces: o.VisitedNamespaces,
cascade: o.DeleteOptions.Cascade,
dryRun: o.DryRun,
serverDryRun: o.ServerDryRun,
gracePeriod: o.DeleteOptions.GracePeriod,
toPrinter: o.ToPrinter,
out: o.Out,
}
}
func (p *pruner) pruneAll(o *ApplyOptions) error {
namespacedRESTMappings, nonNamespacedRESTMappings, err := getRESTMappings(o.Mapper, &(o.PruneResources))
if err != nil {
return fmt.Errorf("error retrieving RESTMappings to prune: %v", err)
}
for n := range p.visitedNamespaces {
if len(o.Namespace) != 0 && n != o.Namespace {
continue
}
for _, m := range namespacedRESTMappings {
if err := p.prune(n, m); err != nil {
return fmt.Errorf("error pruning namespaced object %v: %v", m.GroupVersionKind, err)
}
}
}
for _, m := range nonNamespacedRESTMappings {
if err := p.prune(metav1.NamespaceNone, m); err != nil {
return fmt.Errorf("error pruning nonNamespaced object %v: %v", m.GroupVersionKind, err)
}
}
return nil
}
func (p *pruner) prune(namespace string, mapping *meta.RESTMapping) error {
objList, err := p.dynamicClient.Resource(mapping.Resource).
Namespace(namespace).
List(metav1.ListOptions{
LabelSelector: p.labelSelector,
FieldSelector: p.fieldSelector,
})
if err != nil {
return err
}
objs, err := meta.ExtractList(objList)
if err != nil {
return err
}
for _, obj := range objs {
metadata, err := meta.Accessor(obj)
if err != nil {
return err
}
annots := metadata.GetAnnotations()
if _, ok := annots[corev1.LastAppliedConfigAnnotation]; !ok {
// don't prune resources not created with apply
continue
}
uid := metadata.GetUID()
if p.visitedUids.Has(string(uid)) {
continue
}
name := metadata.GetName()
if !p.dryRun {
if err := p.delete(namespace, name, mapping); err != nil {
return err
}
}
printer, err := p.toPrinter("pruned")
if err != nil {
return err
}
printer.PrintObj(obj, p.out)
}
return nil
}
func (p *pruner) delete(namespace, name string, mapping *meta.RESTMapping) error {
return runDelete(namespace, name, mapping, p.dynamicClient, p.cascade, p.gracePeriod, p.serverDryRun)
}
func runDelete(namespace, name string, mapping *meta.RESTMapping, c dynamic.Interface, cascade bool, gracePeriod int, serverDryRun bool) error {
options := &metav1.DeleteOptions{}
if gracePeriod >= 0 {
options = metav1.NewDeleteOptions(int64(gracePeriod))
}
if serverDryRun {
options.DryRun = []string{metav1.DryRunAll}
}
policy := metav1.DeletePropagationForeground
if !cascade {
policy = metav1.DeletePropagationOrphan
}
options.PropagationPolicy = &policy
return c.Resource(mapping.Resource).Namespace(namespace).Delete(name, options)
}
type pruneResource struct {
group string
version string
kind string
namespaced bool
}
func (pr pruneResource) String() string {
return fmt.Sprintf("%v/%v, Kind=%v, Namespaced=%v", pr.group, pr.version, pr.kind, pr.namespaced)
}
func getRESTMappings(mapper meta.RESTMapper, pruneResources *[]pruneResource) (namespaced, nonNamespaced []*meta.RESTMapping, err error) {
if len(*pruneResources) == 0 {
// default whitelist
// TODO: need to handle the older api versions - e.g. v1beta1 jobs. Github issue: #35991
*pruneResources = []pruneResource{
{"", "v1", "ConfigMap", true},
{"", "v1", "Endpoints", true},
{"", "v1", "Namespace", false},
{"", "v1", "PersistentVolumeClaim", true},
{"", "v1", "PersistentVolume", false},
{"", "v1", "Pod", true},
{"", "v1", "ReplicationController", true},
{"", "v1", "Secret", true},
{"", "v1", "Service", true},
{"batch", "v1", "Job", true},
{"batch", "v1beta1", "CronJob", true},
{"extensions", "v1beta1", "Ingress", true},
{"apps", "v1", "DaemonSet", true},
{"apps", "v1", "Deployment", true},
{"apps", "v1", "ReplicaSet", true},
{"apps", "v1", "StatefulSet", true},
}
}
for _, resource := range *pruneResources {
addedMapping, err := mapper.RESTMapping(schema.GroupKind{Group: resource.group, Kind: resource.kind}, resource.version)
if err != nil {
return nil, nil, fmt.Errorf("invalid resource %v: %v", resource, err)
}
if resource.namespaced {
namespaced = append(namespaced, addedMapping)
} else {
nonNamespaced = append(nonNamespaced, addedMapping)
}
}
return namespaced, nonNamespaced, nil
}
func parsePruneResources(mapper meta.RESTMapper, gvks []string) ([]pruneResource, error) {
pruneResources := []pruneResource{}
for _, groupVersionKind := range gvks {
gvk := strings.Split(groupVersionKind, "/")
if len(gvk) != 3 {
return nil, fmt.Errorf("invalid GroupVersionKind format: %v, please follow <group/version/kind>", groupVersionKind)
}
if gvk[0] == "core" {
gvk[0] = ""
}
mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk[0], Kind: gvk[2]}, gvk[1])
if err != nil {
return pruneResources, err
}
var namespaced bool
namespaceScope := mapping.Scope.Name()
switch namespaceScope {
case meta.RESTScopeNameNamespace:
namespaced = true
case meta.RESTScopeNameRoot:
namespaced = false
default:
return pruneResources, fmt.Errorf("Unknown namespace scope: %q", namespaceScope)
}
pruneResources = append(pruneResources, pruneResource{gvk[0], gvk[1], gvk[2], namespaced})
}
return pruneResources, nil
}