diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go index b2e167f26fd..79fc5afabd0 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go @@ -36,6 +36,7 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" + "k8s.io/apiserver/pkg/endpoints/handlers/finisher" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" @@ -157,7 +158,7 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int } // Dedup owner references before updating managed fields dedupOwnerReferencesAndAddWarning(obj, req.Context(), false) - result, err := finishRequest(ctx, func() (runtime.Object, error) { + result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) { if scope.FieldManager != nil { liveObj, err := scope.Creater.New(scope.Kind) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/delete.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/delete.go index 545ed897c2b..c1a1fc987ee 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/delete.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/delete.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/audit" + "k8s.io/apiserver/pkg/endpoints/handlers/finisher" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" @@ -124,7 +125,7 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc wasDeleted := true userInfo, _ := request.UserFrom(ctx) staticAdmissionAttrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Delete, options, dryrun.IsDryRun(options.DryRun), userInfo) - result, err := finishRequest(ctx, func() (runtime.Object, error) { + result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) { obj, deleted, err := r.Delete(ctx, name, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options) wasDeleted = deleted return obj, err @@ -267,7 +268,7 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestSc admit = admission.WithAudit(admit, ae) userInfo, _ := request.UserFrom(ctx) staticAdmissionAttrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, "", scope.Resource, scope.Subresource, admission.Delete, options, dryrun.IsDryRun(options.DryRun), userInfo) - result, err := finishRequest(ctx, func() (runtime.Object, error) { + result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) { return r.DeleteCollection(ctx, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options, &listOptions) }) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go new file mode 100644 index 00000000000..9832b5b0f0b --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher.go @@ -0,0 +1,82 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package finisher + +import ( + "context" + "fmt" + "net/http" + goruntime "runtime" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// ResultFunc is a function that returns a rest result and can be run in a goroutine +type ResultFunc func() (runtime.Object, error) + +// FinishRequest makes a given ResultFunc asynchronous and handles errors returned by the response. +// An api.Status object with status != success is considered an "error", which interrupts the normal response flow. +func FinishRequest(ctx context.Context, fn ResultFunc) (result runtime.Object, err error) { + // these channels need to be buffered to prevent the goroutine below from hanging indefinitely + // when the select statement reads something other than the one the goroutine sends on. + ch := make(chan runtime.Object, 1) + errCh := make(chan error, 1) + panicCh := make(chan interface{}, 1) + go func() { + // panics don't cross goroutine boundaries, so we have to handle ourselves + defer func() { + panicReason := recover() + if panicReason != nil { + // do not wrap the sentinel ErrAbortHandler panic value + if panicReason != http.ErrAbortHandler { + // Same as stdlib http server code. Manually allocate stack + // trace buffer size to prevent excessively large logs + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:goruntime.Stack(buf, false)] + panicReason = fmt.Sprintf("%v\n%s", panicReason, buf) + } + // Propagate to parent goroutine + panicCh <- panicReason + } + }() + + if result, err := fn(); err != nil { + errCh <- err + } else { + ch <- result + } + }() + + select { + case result = <-ch: + if status, ok := result.(*metav1.Status); ok { + if status.Status != metav1.StatusSuccess { + return nil, errors.FromObject(status) + } + } + return result, nil + case err = <-errCh: + return nil, err + case p := <-panicCh: + panic(p) + case <-ctx.Done(): + return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", ctx.Err()), 0) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher_test.go new file mode 100644 index 00000000000..24aa6ffe83c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/finisher/finisher_test.go @@ -0,0 +1,151 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package finisher + +import ( + "context" + "fmt" + "net/http" + "reflect" + "strings" + "testing" + "time" + + apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/apis/example" +) + +func TestFinishRequest(t *testing.T) { + exampleObj := &example.Pod{} + exampleErr := fmt.Errorf("error") + successStatusObj := &metav1.Status{Status: metav1.StatusSuccess, Message: "success message"} + errorStatusObj := &metav1.Status{Status: metav1.StatusFailure, Message: "error message"} + timeoutFunc := func() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.TODO(), time.Second) + } + + testcases := []struct { + name string + timeout func() (context.Context, context.CancelFunc) + fn ResultFunc + expectedObj runtime.Object + expectedErr error + expectedPanic string + + expectedPanicObj interface{} + }{ + { + name: "Expected obj is returned", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + return exampleObj, nil + }, + expectedObj: exampleObj, + expectedErr: nil, + }, + { + name: "Expected error is returned", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + return nil, exampleErr + }, + expectedObj: nil, + expectedErr: exampleErr, + }, + { + name: "Successful status object is returned as expected", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + return successStatusObj, nil + }, + expectedObj: successStatusObj, + expectedErr: nil, + }, + { + name: "Error status object is converted to StatusError", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + return errorStatusObj, nil + }, + expectedObj: nil, + expectedErr: apierrors.FromObject(errorStatusObj), + }, + { + name: "Panic is propagated up", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + panic("my panic") + }, + expectedObj: nil, + expectedErr: nil, + expectedPanic: "my panic", + }, + { + name: "Panic is propagated with stack", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + panic("my panic") + }, + expectedObj: nil, + expectedErr: nil, + expectedPanic: "finisher_test.go", + }, + { + name: "http.ErrAbortHandler panic is propagated without wrapping with stack", + timeout: timeoutFunc, + fn: func() (runtime.Object, error) { + panic(http.ErrAbortHandler) + }, + expectedObj: nil, + expectedErr: nil, + expectedPanic: http.ErrAbortHandler.Error(), + expectedPanicObj: http.ErrAbortHandler, + }, + } + for i, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := tc.timeout() + defer func() { + cancel() + + r := recover() + switch { + case r == nil && len(tc.expectedPanic) > 0: + t.Errorf("expected panic containing '%s', got none", tc.expectedPanic) + case r != nil && len(tc.expectedPanic) == 0: + t.Errorf("unexpected panic: %v", r) + case r != nil && len(tc.expectedPanic) > 0 && !strings.Contains(fmt.Sprintf("%v", r), tc.expectedPanic): + t.Errorf("expected panic containing '%s', got '%v'", tc.expectedPanic, r) + } + + if tc.expectedPanicObj != nil && !reflect.DeepEqual(tc.expectedPanicObj, r) { + t.Errorf("expected panic obj %#v, got %#v", tc.expectedPanicObj, r) + } + }() + obj, err := FinishRequest(ctx, tc.fn) + if (err == nil && tc.expectedErr != nil) || (err != nil && tc.expectedErr == nil) || (err != nil && tc.expectedErr != nil && err.Error() != tc.expectedErr.Error()) { + t.Errorf("%d: unexpected err. expected: %v, got: %v", i, tc.expectedErr, err) + } + if !apiequality.Semantic.DeepEqual(obj, tc.expectedObj) { + t.Errorf("%d: unexpected obj. expected %#v, got %#v", i, tc.expectedObj, obj) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go index 903307fc285..1abcccaa170 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go @@ -43,6 +43,7 @@ import ( "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" + "k8s.io/apiserver/pkg/endpoints/handlers/finisher" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" @@ -590,7 +591,7 @@ func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runti wasCreated = created return updateObject, updateErr } - result, err := finishRequest(ctx, func() (runtime.Object, error) { + result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) { result, err := requestFunc() // If the object wasn't committed to storage because it's serialized size was too large, // it is safe to remove managedFields (which can be large) and try again. diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go index 783ab96b9b9..f618b3867d6 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go @@ -24,7 +24,6 @@ import ( "io/ioutil" "net/http" "net/url" - goruntime "runtime" "strings" "time" @@ -225,60 +224,6 @@ func (r *responder) Error(err error) { r.scope.err(err, r.w, r.req) } -// resultFunc is a function that returns a rest result and can be run in a goroutine -type resultFunc func() (runtime.Object, error) - -// finishRequest makes a given resultFunc asynchronous and handles errors returned by the response. -// An api.Status object with status != success is considered an "error", which interrupts the normal response flow. -func finishRequest(ctx context.Context, fn resultFunc) (result runtime.Object, err error) { - // these channels need to be buffered to prevent the goroutine below from hanging indefinitely - // when the select statement reads something other than the one the goroutine sends on. - ch := make(chan runtime.Object, 1) - errCh := make(chan error, 1) - panicCh := make(chan interface{}, 1) - go func() { - // panics don't cross goroutine boundaries, so we have to handle ourselves - defer func() { - panicReason := recover() - if panicReason != nil { - // do not wrap the sentinel ErrAbortHandler panic value - if panicReason != http.ErrAbortHandler { - // Same as stdlib http server code. Manually allocate stack - // trace buffer size to prevent excessively large logs - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:goruntime.Stack(buf, false)] - panicReason = fmt.Sprintf("%v\n%s", panicReason, buf) - } - // Propagate to parent goroutine - panicCh <- panicReason - } - }() - - if result, err := fn(); err != nil { - errCh <- err - } else { - ch <- result - } - }() - - select { - case result = <-ch: - if status, ok := result.(*metav1.Status); ok { - if status.Status != metav1.StatusSuccess { - return nil, errors.FromObject(status) - } - } - return result, nil - case err = <-errCh: - return nil, err - case p := <-panicCh: - panic(p) - case <-ctx.Done(): - return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", ctx.Err()), 0) - } -} - // transformDecodeError adds additional information into a bad-request api error when a decode fails. func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *schema.GroupVersionKind, body []byte) error { objGVKs, _, err := typer.ObjectKinds(into) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go index 5bd34f004f0..a2e27076e52 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest_test.go @@ -826,124 +826,6 @@ func TestHasUID(t *testing.T) { } } -func TestFinishRequest(t *testing.T) { - exampleObj := &example.Pod{} - exampleErr := fmt.Errorf("error") - successStatusObj := &metav1.Status{Status: metav1.StatusSuccess, Message: "success message"} - errorStatusObj := &metav1.Status{Status: metav1.StatusFailure, Message: "error message"} - timeoutFunc := func() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.TODO(), time.Second) - } - - testcases := []struct { - name string - timeout func() (context.Context, context.CancelFunc) - fn resultFunc - expectedObj runtime.Object - expectedErr error - expectedPanic string - - expectedPanicObj interface{} - }{ - { - name: "Expected obj is returned", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - return exampleObj, nil - }, - expectedObj: exampleObj, - expectedErr: nil, - }, - { - name: "Expected error is returned", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - return nil, exampleErr - }, - expectedObj: nil, - expectedErr: exampleErr, - }, - { - name: "Successful status object is returned as expected", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - return successStatusObj, nil - }, - expectedObj: successStatusObj, - expectedErr: nil, - }, - { - name: "Error status object is converted to StatusError", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - return errorStatusObj, nil - }, - expectedObj: nil, - expectedErr: apierrors.FromObject(errorStatusObj), - }, - { - name: "Panic is propagated up", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - panic("my panic") - }, - expectedObj: nil, - expectedErr: nil, - expectedPanic: "my panic", - }, - { - name: "Panic is propagated with stack", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - panic("my panic") - }, - expectedObj: nil, - expectedErr: nil, - expectedPanic: "rest_test.go", - }, - { - name: "http.ErrAbortHandler panic is propagated without wrapping with stack", - timeout: timeoutFunc, - fn: func() (runtime.Object, error) { - panic(http.ErrAbortHandler) - }, - expectedObj: nil, - expectedErr: nil, - expectedPanic: http.ErrAbortHandler.Error(), - expectedPanicObj: http.ErrAbortHandler, - }, - } - for i, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - ctx, cancel := tc.timeout() - defer func() { - cancel() - - r := recover() - switch { - case r == nil && len(tc.expectedPanic) > 0: - t.Errorf("expected panic containing '%s', got none", tc.expectedPanic) - case r != nil && len(tc.expectedPanic) == 0: - t.Errorf("unexpected panic: %v", r) - case r != nil && len(tc.expectedPanic) > 0 && !strings.Contains(fmt.Sprintf("%v", r), tc.expectedPanic): - t.Errorf("expected panic containing '%s', got '%v'", tc.expectedPanic, r) - } - - if tc.expectedPanicObj != nil && !reflect.DeepEqual(tc.expectedPanicObj, r) { - t.Errorf("expected panic obj %#v, got %#v", tc.expectedPanicObj, r) - } - }() - obj, err := finishRequest(ctx, tc.fn) - if (err == nil && tc.expectedErr != nil) || (err != nil && tc.expectedErr == nil) || (err != nil && tc.expectedErr != nil && err.Error() != tc.expectedErr.Error()) { - t.Errorf("%d: unexpected err. expected: %v, got: %v", i, tc.expectedErr, err) - } - if !apiequality.Semantic.DeepEqual(obj, tc.expectedObj) { - t.Errorf("%d: unexpected obj. expected %#v, got %#v", i, tc.expectedObj, obj) - } - }) - } -} - func setTcPod(tcPod *example.Pod, name string, namespace string, uid types.UID, resourceVersion string, apiVersion string, activeDeadlineSeconds *int64, nodeName string) { tcPod.Name = name tcPod.Namespace = namespace diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go index 57daefd9cf1..b66b57a8b62 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go @@ -34,6 +34,7 @@ import ( "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" + "k8s.io/apiserver/pkg/endpoints/handlers/finisher" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" @@ -198,7 +199,7 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa } // Dedup owner references before updating managed fields dedupOwnerReferencesAndAddWarning(obj, req.Context(), false) - result, err := finishRequest(ctx, func() (runtime.Object, error) { + result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) { result, err := requestFunc() // If the object wasn't committed to storage because it's serialized size was too large, // it is safe to remove managedFields (which can be large) and try again. diff --git a/vendor/modules.txt b/vendor/modules.txt index 2749d593e88..94b9515bd54 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1818,6 +1818,7 @@ k8s.io/apiserver/pkg/endpoints/filters k8s.io/apiserver/pkg/endpoints/handlers k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal +k8s.io/apiserver/pkg/endpoints/handlers/finisher k8s.io/apiserver/pkg/endpoints/handlers/negotiation k8s.io/apiserver/pkg/endpoints/handlers/responsewriters k8s.io/apiserver/pkg/endpoints/metrics