mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 18:31:15 +00:00
refactor finishRequest
This commit is contained in:
parent
393a1f73fb
commit
a8ff821a19
@ -30,52 +30,79 @@ import (
|
||||
// ResultFunc is a function that returns a rest result and can be run in a goroutine
|
||||
type ResultFunc func() (runtime.Object, error)
|
||||
|
||||
// result stores the return values or panic from a ResultFunc function
|
||||
type result struct {
|
||||
// object stores the response returned by the ResultFunc function
|
||||
object runtime.Object
|
||||
// err stores the error returned by the ResultFunc function
|
||||
err error
|
||||
// reason stores the reason from a panic thrown by the ResultFunc function
|
||||
reason interface{}
|
||||
}
|
||||
|
||||
// Return processes the result returned by a ResultFunc function
|
||||
func (r *result) Return() (runtime.Object, error) {
|
||||
switch {
|
||||
case r.reason != nil:
|
||||
// panic has higher precedence, the goroutine executing ResultFunc has panic'd,
|
||||
// so propagate a panic to the caller.
|
||||
panic(r.reason)
|
||||
case r.err != nil:
|
||||
return nil, r.err
|
||||
default:
|
||||
// if we are here, it means neither a panic, nor an error
|
||||
if status, ok := r.object.(*metav1.Status); ok {
|
||||
// An api.Status object with status != success is considered an "error",
|
||||
// which interrupts the normal response flow.
|
||||
if status.Status != metav1.StatusSuccess {
|
||||
return nil, errors.FromObject(status)
|
||||
}
|
||||
}
|
||||
return r.object, nil
|
||||
}
|
||||
}
|
||||
|
||||
// FinishRequest makes a given ResultFunc asynchronous and handles errors returned by the response.
|
||||
// An api.Status object with status != success is considered an "error", which interrupts the normal response flow.
|
||||
func FinishRequest(ctx context.Context, fn ResultFunc) (result runtime.Object, err error) {
|
||||
// these channels need to be buffered to prevent the goroutine below from hanging indefinitely
|
||||
func FinishRequest(ctx context.Context, fn ResultFunc) (runtime.Object, error) {
|
||||
// the channel needs to be buffered to prevent the goroutine below from hanging indefinitely
|
||||
// when the select statement reads something other than the one the goroutine sends on.
|
||||
ch := make(chan runtime.Object, 1)
|
||||
errCh := make(chan error, 1)
|
||||
panicCh := make(chan interface{}, 1)
|
||||
resultCh := make(chan *result, 1)
|
||||
|
||||
go func() {
|
||||
result := &result{}
|
||||
|
||||
// panics don't cross goroutine boundaries, so we have to handle ourselves
|
||||
defer func() {
|
||||
panicReason := recover()
|
||||
if panicReason != nil {
|
||||
reason := recover()
|
||||
if reason != nil {
|
||||
// do not wrap the sentinel ErrAbortHandler panic value
|
||||
if panicReason != http.ErrAbortHandler {
|
||||
if reason != http.ErrAbortHandler {
|
||||
// Same as stdlib http server code. Manually allocate stack
|
||||
// trace buffer size to prevent excessively large logs
|
||||
const size = 64 << 10
|
||||
buf := make([]byte, size)
|
||||
buf = buf[:goruntime.Stack(buf, false)]
|
||||
panicReason = fmt.Sprintf("%v\n%s", panicReason, buf)
|
||||
reason = fmt.Sprintf("%v\n%s", reason, buf)
|
||||
}
|
||||
// Propagate to parent goroutine
|
||||
panicCh <- panicReason
|
||||
|
||||
// store the panic reason into the result.
|
||||
result.reason = reason
|
||||
}
|
||||
|
||||
// Propagate the result to the parent goroutine
|
||||
resultCh <- result
|
||||
}()
|
||||
|
||||
if result, err := fn(); err != nil {
|
||||
errCh <- err
|
||||
if object, err := fn(); err != nil {
|
||||
result.err = err
|
||||
} else {
|
||||
ch <- result
|
||||
result.object = object
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case result = <-ch:
|
||||
if status, ok := result.(*metav1.Status); ok {
|
||||
if status.Status != metav1.StatusSuccess {
|
||||
return nil, errors.FromObject(status)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
case err = <-errCh:
|
||||
return nil, err
|
||||
case p := <-panicCh:
|
||||
panic(p)
|
||||
case result := <-resultCh:
|
||||
return result.Return()
|
||||
case <-ctx.Done():
|
||||
return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", ctx.Err()), 0)
|
||||
}
|
||||
|
@ -69,6 +69,13 @@ func TestFinishRequest(t *testing.T) {
|
||||
expectedObj: nil,
|
||||
expectedErr: exampleErr,
|
||||
},
|
||||
{
|
||||
name: "No expected error or object or panic",
|
||||
timeout: timeoutFunc,
|
||||
fn: func() (runtime.Object, error) {
|
||||
return nil, nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Successful status object is returned as expected",
|
||||
timeout: timeoutFunc,
|
||||
|
Loading…
Reference in New Issue
Block a user