mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-07 12:11:43 +00:00
Merge pull request #75368 from mfojtik/retry-on-errors
rest: retry on connection refused and apiserver shutdown
This commit is contained in:
@@ -43,7 +43,9 @@ go_library(
|
|||||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/filters",
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/filters",
|
||||||
importpath = "k8s.io/apiserver/pkg/server/filters",
|
importpath = "k8s.io/apiserver/pkg/server/filters",
|
||||||
deps = [
|
deps = [
|
||||||
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
@@ -53,6 +55,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/server/httplog:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/server/httplog:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@@ -18,11 +18,16 @@ package filters
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
|
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
|
||||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
"k8s.io/client-go/kubernetes/scheme"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WithWaitGroup adds all non long-running requests to wait group, which is used for graceful shutdown.
|
// WithWaitGroup adds all non long-running requests to wait group, which is used for graceful shutdown.
|
||||||
@@ -38,7 +43,14 @@ func WithWaitGroup(handler http.Handler, longRunning apirequest.LongRunningReque
|
|||||||
|
|
||||||
if !longRunning(req, requestInfo) {
|
if !longRunning(req, requestInfo) {
|
||||||
if err := wg.Add(1); err != nil {
|
if err := wg.Add(1); err != nil {
|
||||||
http.Error(w, "apiserver is shutting down.", http.StatusInternalServerError)
|
// When apiserver is shutting down, signal clients to retry
|
||||||
|
// There is a good chance the client hit a different server, so a tight retry is good for client responsiveness.
|
||||||
|
w.Header().Add("Retry-After", "1")
|
||||||
|
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
|
||||||
|
w.Header().Set("X-Content-Type-Options", "nosniff")
|
||||||
|
statusErr := apierrors.NewServiceUnavailable("apiserver is shutting down").Status()
|
||||||
|
w.WriteHeader(int(statusErr.Code))
|
||||||
|
fmt.Fprintln(w, runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &statusErr))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
@@ -806,13 +806,15 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
|
|||||||
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
|
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// "Connection reset by peer" is usually a transient error.
|
// "Connection reset by peer", "Connection refused" or "apiserver is shutting down" are usually a transient errors.
|
||||||
// Thus in case of "GET" operations, we simply retry it.
|
// Thus in case of "GET" operations, we simply retry it.
|
||||||
// We are not automatically retrying "write" operations, as
|
// We are not automatically retrying "write" operations, as
|
||||||
// they are not idempotent.
|
// they are not idempotent.
|
||||||
if !net.IsConnectionReset(err) || r.verb != "GET" {
|
if r.verb != "GET" {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// For connection errors and apiserver shutdown errors retry.
|
||||||
|
if net.IsConnectionReset(err) || net.IsConnectionRefused(err) {
|
||||||
// For the purpose of retry, we set the artificial "retry-after" response.
|
// For the purpose of retry, we set the artificial "retry-after" response.
|
||||||
// TODO: Should we clean the original response if it exists?
|
// TODO: Should we clean the original response if it exists?
|
||||||
resp = &http.Response{
|
resp = &http.Response{
|
||||||
@@ -820,6 +822,9 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
|
|||||||
Header: http.Header{"Retry-After": []string{"1"}},
|
Header: http.Header{"Retry-After": []string{"1"}},
|
||||||
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
|
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
done := func() bool {
|
done := func() bool {
|
||||||
|
Reference in New Issue
Block a user