Merge pull request #100523 from tkashem/refactor-finish-request

Refactor rest.FinishRequest function
This commit is contained in:
Kubernetes Prow Robot 2021-04-08 22:10:40 -07:00 committed by GitHub
commit a5489431cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 277 additions and 178 deletions

View File

@ -36,6 +36,7 @@ import (
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/endpoints/handlers/finisher"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
@ -157,7 +158,7 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
}
// Dedup owner references before updating managed fields
dedupOwnerReferencesAndAddWarning(obj, req.Context(), false)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
if scope.FieldManager != nil {
liveObj, err := scope.Creater.New(scope.Kind)
if err != nil {

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/finisher"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
@ -124,7 +125,7 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc
wasDeleted := true
userInfo, _ := request.UserFrom(ctx)
staticAdmissionAttrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Delete, options, dryrun.IsDryRun(options.DryRun), userInfo)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
obj, deleted, err := r.Delete(ctx, name, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options)
wasDeleted = deleted
return obj, err
@ -267,7 +268,7 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestSc
admit = admission.WithAudit(admit, ae)
userInfo, _ := request.UserFrom(ctx)
staticAdmissionAttrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, "", scope.Resource, scope.Subresource, admission.Delete, options, dryrun.IsDryRun(options.DryRun), userInfo)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
return r.DeleteCollection(ctx, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options, &listOptions)
})
if err != nil {

View File

@ -0,0 +1,109 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package finisher
import (
"context"
"fmt"
"net/http"
goruntime "runtime"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
// ResultFunc is a function that returns a rest result and can be run in a goroutine
type ResultFunc func() (runtime.Object, error)
// result stores the return values or panic from a ResultFunc function
type result struct {
// object stores the response returned by the ResultFunc function
object runtime.Object
// err stores the error returned by the ResultFunc function
err error
// reason stores the reason from a panic thrown by the ResultFunc function
reason interface{}
}
// Return processes the result returned by a ResultFunc function
func (r *result) Return() (runtime.Object, error) {
switch {
case r.reason != nil:
// panic has higher precedence, the goroutine executing ResultFunc has panic'd,
// so propagate a panic to the caller.
panic(r.reason)
case r.err != nil:
return nil, r.err
default:
// if we are here, it means neither a panic, nor an error
if status, ok := r.object.(*metav1.Status); ok {
// An api.Status object with status != success is considered an "error",
// which interrupts the normal response flow.
if status.Status != metav1.StatusSuccess {
return nil, errors.FromObject(status)
}
}
return r.object, nil
}
}
// FinishRequest makes a given ResultFunc asynchronous and handles errors returned by the response.
func FinishRequest(ctx context.Context, fn ResultFunc) (runtime.Object, error) {
// the channel needs to be buffered to prevent the goroutine below from hanging indefinitely
// when the select statement reads something other than the one the goroutine sends on.
resultCh := make(chan *result, 1)
go func() {
result := &result{}
// panics don't cross goroutine boundaries, so we have to handle ourselves
defer func() {
reason := recover()
if reason != nil {
// do not wrap the sentinel ErrAbortHandler panic value
if reason != http.ErrAbortHandler {
// Same as stdlib http server code. Manually allocate stack
// trace buffer size to prevent excessively large logs
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:goruntime.Stack(buf, false)]
reason = fmt.Sprintf("%v\n%s", reason, buf)
}
// store the panic reason into the result.
result.reason = reason
}
// Propagate the result to the parent goroutine
resultCh <- result
}()
if object, err := fn(); err != nil {
result.err = err
} else {
result.object = object
}
}()
select {
case result := <-resultCh:
return result.Return()
case <-ctx.Done():
return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", ctx.Err()), 0)
}
}

View File

@ -0,0 +1,158 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package finisher
import (
"context"
"fmt"
"net/http"
"reflect"
"strings"
"testing"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/apis/example"
)
func TestFinishRequest(t *testing.T) {
exampleObj := &example.Pod{}
exampleErr := fmt.Errorf("error")
successStatusObj := &metav1.Status{Status: metav1.StatusSuccess, Message: "success message"}
errorStatusObj := &metav1.Status{Status: metav1.StatusFailure, Message: "error message"}
timeoutFunc := func() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.TODO(), time.Second)
}
testcases := []struct {
name string
timeout func() (context.Context, context.CancelFunc)
fn ResultFunc
expectedObj runtime.Object
expectedErr error
expectedPanic string
expectedPanicObj interface{}
}{
{
name: "Expected obj is returned",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return exampleObj, nil
},
expectedObj: exampleObj,
expectedErr: nil,
},
{
name: "Expected error is returned",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return nil, exampleErr
},
expectedObj: nil,
expectedErr: exampleErr,
},
{
name: "No expected error or object or panic",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return nil, nil
},
},
{
name: "Successful status object is returned as expected",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return successStatusObj, nil
},
expectedObj: successStatusObj,
expectedErr: nil,
},
{
name: "Error status object is converted to StatusError",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return errorStatusObj, nil
},
expectedObj: nil,
expectedErr: apierrors.FromObject(errorStatusObj),
},
{
name: "Panic is propagated up",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
panic("my panic")
},
expectedObj: nil,
expectedErr: nil,
expectedPanic: "my panic",
},
{
name: "Panic is propagated with stack",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
panic("my panic")
},
expectedObj: nil,
expectedErr: nil,
expectedPanic: "finisher_test.go",
},
{
name: "http.ErrAbortHandler panic is propagated without wrapping with stack",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
panic(http.ErrAbortHandler)
},
expectedObj: nil,
expectedErr: nil,
expectedPanic: http.ErrAbortHandler.Error(),
expectedPanicObj: http.ErrAbortHandler,
},
}
for i, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := tc.timeout()
defer func() {
cancel()
r := recover()
switch {
case r == nil && len(tc.expectedPanic) > 0:
t.Errorf("expected panic containing '%s', got none", tc.expectedPanic)
case r != nil && len(tc.expectedPanic) == 0:
t.Errorf("unexpected panic: %v", r)
case r != nil && len(tc.expectedPanic) > 0 && !strings.Contains(fmt.Sprintf("%v", r), tc.expectedPanic):
t.Errorf("expected panic containing '%s', got '%v'", tc.expectedPanic, r)
}
if tc.expectedPanicObj != nil && !reflect.DeepEqual(tc.expectedPanicObj, r) {
t.Errorf("expected panic obj %#v, got %#v", tc.expectedPanicObj, r)
}
}()
obj, err := FinishRequest(ctx, tc.fn)
if (err == nil && tc.expectedErr != nil) || (err != nil && tc.expectedErr == nil) || (err != nil && tc.expectedErr != nil && err.Error() != tc.expectedErr.Error()) {
t.Errorf("%d: unexpected err. expected: %v, got: %v", i, tc.expectedErr, err)
}
if !apiequality.Semantic.DeepEqual(obj, tc.expectedObj) {
t.Errorf("%d: unexpected obj. expected %#v, got %#v", i, tc.expectedObj, obj)
}
})
}
}

View File

@ -43,6 +43,7 @@ import (
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/endpoints/handlers/finisher"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
@ -590,7 +591,7 @@ func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runti
wasCreated = created
return updateObject, updateErr
}
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large,
// it is safe to remove managedFields (which can be large) and try again.

View File

@ -24,7 +24,6 @@ import (
"io/ioutil"
"net/http"
"net/url"
goruntime "runtime"
"strings"
"time"
@ -225,60 +224,6 @@ func (r *responder) Error(err error) {
r.scope.err(err, r.w, r.req)
}
// resultFunc is a function that returns a rest result and can be run in a goroutine
type resultFunc func() (runtime.Object, error)
// finishRequest makes a given resultFunc asynchronous and handles errors returned by the response.
// An api.Status object with status != success is considered an "error", which interrupts the normal response flow.
func finishRequest(ctx context.Context, fn resultFunc) (result runtime.Object, err error) {
// these channels need to be buffered to prevent the goroutine below from hanging indefinitely
// when the select statement reads something other than the one the goroutine sends on.
ch := make(chan runtime.Object, 1)
errCh := make(chan error, 1)
panicCh := make(chan interface{}, 1)
go func() {
// panics don't cross goroutine boundaries, so we have to handle ourselves
defer func() {
panicReason := recover()
if panicReason != nil {
// do not wrap the sentinel ErrAbortHandler panic value
if panicReason != http.ErrAbortHandler {
// Same as stdlib http server code. Manually allocate stack
// trace buffer size to prevent excessively large logs
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:goruntime.Stack(buf, false)]
panicReason = fmt.Sprintf("%v\n%s", panicReason, buf)
}
// Propagate to parent goroutine
panicCh <- panicReason
}
}()
if result, err := fn(); err != nil {
errCh <- err
} else {
ch <- result
}
}()
select {
case result = <-ch:
if status, ok := result.(*metav1.Status); ok {
if status.Status != metav1.StatusSuccess {
return nil, errors.FromObject(status)
}
}
return result, nil
case err = <-errCh:
return nil, err
case p := <-panicCh:
panic(p)
case <-ctx.Done():
return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", ctx.Err()), 0)
}
}
// transformDecodeError adds additional information into a bad-request api error when a decode fails.
func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *schema.GroupVersionKind, body []byte) error {
objGVKs, _, err := typer.ObjectKinds(into)

View File

@ -826,124 +826,6 @@ func TestHasUID(t *testing.T) {
}
}
func TestFinishRequest(t *testing.T) {
exampleObj := &example.Pod{}
exampleErr := fmt.Errorf("error")
successStatusObj := &metav1.Status{Status: metav1.StatusSuccess, Message: "success message"}
errorStatusObj := &metav1.Status{Status: metav1.StatusFailure, Message: "error message"}
timeoutFunc := func() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.TODO(), time.Second)
}
testcases := []struct {
name string
timeout func() (context.Context, context.CancelFunc)
fn resultFunc
expectedObj runtime.Object
expectedErr error
expectedPanic string
expectedPanicObj interface{}
}{
{
name: "Expected obj is returned",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return exampleObj, nil
},
expectedObj: exampleObj,
expectedErr: nil,
},
{
name: "Expected error is returned",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return nil, exampleErr
},
expectedObj: nil,
expectedErr: exampleErr,
},
{
name: "Successful status object is returned as expected",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return successStatusObj, nil
},
expectedObj: successStatusObj,
expectedErr: nil,
},
{
name: "Error status object is converted to StatusError",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return errorStatusObj, nil
},
expectedObj: nil,
expectedErr: apierrors.FromObject(errorStatusObj),
},
{
name: "Panic is propagated up",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
panic("my panic")
},
expectedObj: nil,
expectedErr: nil,
expectedPanic: "my panic",
},
{
name: "Panic is propagated with stack",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
panic("my panic")
},
expectedObj: nil,
expectedErr: nil,
expectedPanic: "rest_test.go",
},
{
name: "http.ErrAbortHandler panic is propagated without wrapping with stack",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
panic(http.ErrAbortHandler)
},
expectedObj: nil,
expectedErr: nil,
expectedPanic: http.ErrAbortHandler.Error(),
expectedPanicObj: http.ErrAbortHandler,
},
}
for i, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := tc.timeout()
defer func() {
cancel()
r := recover()
switch {
case r == nil && len(tc.expectedPanic) > 0:
t.Errorf("expected panic containing '%s', got none", tc.expectedPanic)
case r != nil && len(tc.expectedPanic) == 0:
t.Errorf("unexpected panic: %v", r)
case r != nil && len(tc.expectedPanic) > 0 && !strings.Contains(fmt.Sprintf("%v", r), tc.expectedPanic):
t.Errorf("expected panic containing '%s', got '%v'", tc.expectedPanic, r)
}
if tc.expectedPanicObj != nil && !reflect.DeepEqual(tc.expectedPanicObj, r) {
t.Errorf("expected panic obj %#v, got %#v", tc.expectedPanicObj, r)
}
}()
obj, err := finishRequest(ctx, tc.fn)
if (err == nil && tc.expectedErr != nil) || (err != nil && tc.expectedErr == nil) || (err != nil && tc.expectedErr != nil && err.Error() != tc.expectedErr.Error()) {
t.Errorf("%d: unexpected err. expected: %v, got: %v", i, tc.expectedErr, err)
}
if !apiequality.Semantic.DeepEqual(obj, tc.expectedObj) {
t.Errorf("%d: unexpected obj. expected %#v, got %#v", i, tc.expectedObj, obj)
}
})
}
}
func setTcPod(tcPod *example.Pod, name string, namespace string, uid types.UID, resourceVersion string, apiVersion string, activeDeadlineSeconds *int64, nodeName string) {
tcPod.Name = name
tcPod.Namespace = namespace

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/endpoints/handlers/finisher"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
@ -198,7 +199,7 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
}
// Dedup owner references before updating managed fields
dedupOwnerReferencesAndAddWarning(obj, req.Context(), false)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large,
// it is safe to remove managedFields (which can be large) and try again.

1
vendor/modules.txt vendored
View File

@ -1818,6 +1818,7 @@ k8s.io/apiserver/pkg/endpoints/filters
k8s.io/apiserver/pkg/endpoints/handlers
k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager
k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal
k8s.io/apiserver/pkg/endpoints/handlers/finisher
k8s.io/apiserver/pkg/endpoints/handlers/negotiation
k8s.io/apiserver/pkg/endpoints/handlers/responsewriters
k8s.io/apiserver/pkg/endpoints/metrics