move FinishRequest to its own package

This commit is contained in:
Abu Kashem 2021-03-22 16:39:14 -04:00
parent 2f8a2258f4
commit 393a1f73fb
No known key found for this signature in database
GPG Key ID: 33A4FA7088DB68A9
9 changed files with 243 additions and 178 deletions

View File

@ -36,6 +36,7 @@ import (
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/endpoints/handlers/finisher"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
@ -157,7 +158,7 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
}
// Dedup owner references before updating managed fields
dedupOwnerReferencesAndAddWarning(obj, req.Context(), false)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
if scope.FieldManager != nil {
liveObj, err := scope.Creater.New(scope.Kind)
if err != nil {

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/finisher"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
@ -124,7 +125,7 @@ func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope *RequestSc
wasDeleted := true
userInfo, _ := request.UserFrom(ctx)
staticAdmissionAttrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Delete, options, dryrun.IsDryRun(options.DryRun), userInfo)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
obj, deleted, err := r.Delete(ctx, name, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options)
wasDeleted = deleted
return obj, err
@ -267,7 +268,7 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestSc
admit = admission.WithAudit(admit, ae)
userInfo, _ := request.UserFrom(ctx)
staticAdmissionAttrs := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, "", scope.Resource, scope.Subresource, admission.Delete, options, dryrun.IsDryRun(options.DryRun), userInfo)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
return r.DeleteCollection(ctx, rest.AdmissionToValidateObjectDeleteFunc(admit, staticAdmissionAttrs, scope), options, &listOptions)
})
if err != nil {

View File

@ -0,0 +1,82 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package finisher
import (
"context"
"fmt"
"net/http"
goruntime "runtime"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
// ResultFunc is a function that returns a rest result and can be run in a goroutine
type ResultFunc func() (runtime.Object, error)
// FinishRequest makes a given ResultFunc asynchronous and handles errors returned by the response.
// An api.Status object with status != success is considered an "error", which interrupts the normal response flow.
func FinishRequest(ctx context.Context, fn ResultFunc) (result runtime.Object, err error) {
// these channels need to be buffered to prevent the goroutine below from hanging indefinitely
// when the select statement reads something other than the one the goroutine sends on.
ch := make(chan runtime.Object, 1)
errCh := make(chan error, 1)
panicCh := make(chan interface{}, 1)
go func() {
// panics don't cross goroutine boundaries, so we have to handle ourselves
defer func() {
panicReason := recover()
if panicReason != nil {
// do not wrap the sentinel ErrAbortHandler panic value
if panicReason != http.ErrAbortHandler {
// Same as stdlib http server code. Manually allocate stack
// trace buffer size to prevent excessively large logs
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:goruntime.Stack(buf, false)]
panicReason = fmt.Sprintf("%v\n%s", panicReason, buf)
}
// Propagate to parent goroutine
panicCh <- panicReason
}
}()
if result, err := fn(); err != nil {
errCh <- err
} else {
ch <- result
}
}()
select {
case result = <-ch:
if status, ok := result.(*metav1.Status); ok {
if status.Status != metav1.StatusSuccess {
return nil, errors.FromObject(status)
}
}
return result, nil
case err = <-errCh:
return nil, err
case p := <-panicCh:
panic(p)
case <-ctx.Done():
return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", ctx.Err()), 0)
}
}

View File

@ -0,0 +1,151 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package finisher
import (
"context"
"fmt"
"net/http"
"reflect"
"strings"
"testing"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/apis/example"
)
func TestFinishRequest(t *testing.T) {
exampleObj := &example.Pod{}
exampleErr := fmt.Errorf("error")
successStatusObj := &metav1.Status{Status: metav1.StatusSuccess, Message: "success message"}
errorStatusObj := &metav1.Status{Status: metav1.StatusFailure, Message: "error message"}
timeoutFunc := func() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.TODO(), time.Second)
}
testcases := []struct {
name string
timeout func() (context.Context, context.CancelFunc)
fn ResultFunc
expectedObj runtime.Object
expectedErr error
expectedPanic string
expectedPanicObj interface{}
}{
{
name: "Expected obj is returned",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return exampleObj, nil
},
expectedObj: exampleObj,
expectedErr: nil,
},
{
name: "Expected error is returned",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return nil, exampleErr
},
expectedObj: nil,
expectedErr: exampleErr,
},
{
name: "Successful status object is returned as expected",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return successStatusObj, nil
},
expectedObj: successStatusObj,
expectedErr: nil,
},
{
name: "Error status object is converted to StatusError",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return errorStatusObj, nil
},
expectedObj: nil,
expectedErr: apierrors.FromObject(errorStatusObj),
},
{
name: "Panic is propagated up",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
panic("my panic")
},
expectedObj: nil,
expectedErr: nil,
expectedPanic: "my panic",
},
{
name: "Panic is propagated with stack",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
panic("my panic")
},
expectedObj: nil,
expectedErr: nil,
expectedPanic: "finisher_test.go",
},
{
name: "http.ErrAbortHandler panic is propagated without wrapping with stack",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
panic(http.ErrAbortHandler)
},
expectedObj: nil,
expectedErr: nil,
expectedPanic: http.ErrAbortHandler.Error(),
expectedPanicObj: http.ErrAbortHandler,
},
}
for i, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := tc.timeout()
defer func() {
cancel()
r := recover()
switch {
case r == nil && len(tc.expectedPanic) > 0:
t.Errorf("expected panic containing '%s', got none", tc.expectedPanic)
case r != nil && len(tc.expectedPanic) == 0:
t.Errorf("unexpected panic: %v", r)
case r != nil && len(tc.expectedPanic) > 0 && !strings.Contains(fmt.Sprintf("%v", r), tc.expectedPanic):
t.Errorf("expected panic containing '%s', got '%v'", tc.expectedPanic, r)
}
if tc.expectedPanicObj != nil && !reflect.DeepEqual(tc.expectedPanicObj, r) {
t.Errorf("expected panic obj %#v, got %#v", tc.expectedPanicObj, r)
}
}()
obj, err := FinishRequest(ctx, tc.fn)
if (err == nil && tc.expectedErr != nil) || (err != nil && tc.expectedErr == nil) || (err != nil && tc.expectedErr != nil && err.Error() != tc.expectedErr.Error()) {
t.Errorf("%d: unexpected err. expected: %v, got: %v", i, tc.expectedErr, err)
}
if !apiequality.Semantic.DeepEqual(obj, tc.expectedObj) {
t.Errorf("%d: unexpected obj. expected %#v, got %#v", i, tc.expectedObj, obj)
}
})
}
}

View File

@ -43,6 +43,7 @@ import (
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/endpoints/handlers/finisher"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
@ -590,7 +591,7 @@ func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runti
wasCreated = created
return updateObject, updateErr
}
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large,
// it is safe to remove managedFields (which can be large) and try again.

View File

@ -24,7 +24,6 @@ import (
"io/ioutil"
"net/http"
"net/url"
goruntime "runtime"
"strings"
"time"
@ -225,60 +224,6 @@ func (r *responder) Error(err error) {
r.scope.err(err, r.w, r.req)
}
// resultFunc is a function that returns a rest result and can be run in a goroutine
type resultFunc func() (runtime.Object, error)
// finishRequest makes a given resultFunc asynchronous and handles errors returned by the response.
// An api.Status object with status != success is considered an "error", which interrupts the normal response flow.
func finishRequest(ctx context.Context, fn resultFunc) (result runtime.Object, err error) {
// these channels need to be buffered to prevent the goroutine below from hanging indefinitely
// when the select statement reads something other than the one the goroutine sends on.
ch := make(chan runtime.Object, 1)
errCh := make(chan error, 1)
panicCh := make(chan interface{}, 1)
go func() {
// panics don't cross goroutine boundaries, so we have to handle ourselves
defer func() {
panicReason := recover()
if panicReason != nil {
// do not wrap the sentinel ErrAbortHandler panic value
if panicReason != http.ErrAbortHandler {
// Same as stdlib http server code. Manually allocate stack
// trace buffer size to prevent excessively large logs
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:goruntime.Stack(buf, false)]
panicReason = fmt.Sprintf("%v\n%s", panicReason, buf)
}
// Propagate to parent goroutine
panicCh <- panicReason
}
}()
if result, err := fn(); err != nil {
errCh <- err
} else {
ch <- result
}
}()
select {
case result = <-ch:
if status, ok := result.(*metav1.Status); ok {
if status.Status != metav1.StatusSuccess {
return nil, errors.FromObject(status)
}
}
return result, nil
case err = <-errCh:
return nil, err
case p := <-panicCh:
panic(p)
case <-ctx.Done():
return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", ctx.Err()), 0)
}
}
// transformDecodeError adds additional information into a bad-request api error when a decode fails.
func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *schema.GroupVersionKind, body []byte) error {
objGVKs, _, err := typer.ObjectKinds(into)

View File

@ -826,124 +826,6 @@ func TestHasUID(t *testing.T) {
}
}
func TestFinishRequest(t *testing.T) {
exampleObj := &example.Pod{}
exampleErr := fmt.Errorf("error")
successStatusObj := &metav1.Status{Status: metav1.StatusSuccess, Message: "success message"}
errorStatusObj := &metav1.Status{Status: metav1.StatusFailure, Message: "error message"}
timeoutFunc := func() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.TODO(), time.Second)
}
testcases := []struct {
name string
timeout func() (context.Context, context.CancelFunc)
fn resultFunc
expectedObj runtime.Object
expectedErr error
expectedPanic string
expectedPanicObj interface{}
}{
{
name: "Expected obj is returned",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return exampleObj, nil
},
expectedObj: exampleObj,
expectedErr: nil,
},
{
name: "Expected error is returned",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return nil, exampleErr
},
expectedObj: nil,
expectedErr: exampleErr,
},
{
name: "Successful status object is returned as expected",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return successStatusObj, nil
},
expectedObj: successStatusObj,
expectedErr: nil,
},
{
name: "Error status object is converted to StatusError",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
return errorStatusObj, nil
},
expectedObj: nil,
expectedErr: apierrors.FromObject(errorStatusObj),
},
{
name: "Panic is propagated up",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
panic("my panic")
},
expectedObj: nil,
expectedErr: nil,
expectedPanic: "my panic",
},
{
name: "Panic is propagated with stack",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
panic("my panic")
},
expectedObj: nil,
expectedErr: nil,
expectedPanic: "rest_test.go",
},
{
name: "http.ErrAbortHandler panic is propagated without wrapping with stack",
timeout: timeoutFunc,
fn: func() (runtime.Object, error) {
panic(http.ErrAbortHandler)
},
expectedObj: nil,
expectedErr: nil,
expectedPanic: http.ErrAbortHandler.Error(),
expectedPanicObj: http.ErrAbortHandler,
},
}
for i, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := tc.timeout()
defer func() {
cancel()
r := recover()
switch {
case r == nil && len(tc.expectedPanic) > 0:
t.Errorf("expected panic containing '%s', got none", tc.expectedPanic)
case r != nil && len(tc.expectedPanic) == 0:
t.Errorf("unexpected panic: %v", r)
case r != nil && len(tc.expectedPanic) > 0 && !strings.Contains(fmt.Sprintf("%v", r), tc.expectedPanic):
t.Errorf("expected panic containing '%s', got '%v'", tc.expectedPanic, r)
}
if tc.expectedPanicObj != nil && !reflect.DeepEqual(tc.expectedPanicObj, r) {
t.Errorf("expected panic obj %#v, got %#v", tc.expectedPanicObj, r)
}
}()
obj, err := finishRequest(ctx, tc.fn)
if (err == nil && tc.expectedErr != nil) || (err != nil && tc.expectedErr == nil) || (err != nil && tc.expectedErr != nil && err.Error() != tc.expectedErr.Error()) {
t.Errorf("%d: unexpected err. expected: %v, got: %v", i, tc.expectedErr, err)
}
if !apiequality.Semantic.DeepEqual(obj, tc.expectedObj) {
t.Errorf("%d: unexpected obj. expected %#v, got %#v", i, tc.expectedObj, obj)
}
})
}
}
func setTcPod(tcPod *example.Pod, name string, namespace string, uid types.UID, resourceVersion string, apiVersion string, activeDeadlineSeconds *int64, nodeName string) {
tcPod.Name = name
tcPod.Namespace = namespace

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/endpoints/handlers/finisher"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
@ -198,7 +199,7 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
}
// Dedup owner references before updating managed fields
dedupOwnerReferencesAndAddWarning(obj, req.Context(), false)
result, err := finishRequest(ctx, func() (runtime.Object, error) {
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large,
// it is safe to remove managedFields (which can be large) and try again.

1
vendor/modules.txt vendored
View File

@ -1818,6 +1818,7 @@ k8s.io/apiserver/pkg/endpoints/filters
k8s.io/apiserver/pkg/endpoints/handlers
k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager
k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager/internal
k8s.io/apiserver/pkg/endpoints/handlers/finisher
k8s.io/apiserver/pkg/endpoints/handlers/negotiation
k8s.io/apiserver/pkg/endpoints/handlers/responsewriters
k8s.io/apiserver/pkg/endpoints/metrics