mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 17:30:00 +00:00
commit
cf870bb7fc
@ -150,7 +150,7 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// find servers that are capable of serving this request
|
// find servers that are capable of serving this request
|
||||||
serviceableByResp, err := h.findServiceableByServers(gvr, h.serverId, h.reconciler)
|
serviceableByResp, err := h.findServiceableByServers(gvr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// this means that resource is an aggregated API or a CR since it wasn't found in SV informer cache, pass as it is
|
// this means that resource is an aggregated API or a CR since it wasn't found in SV informer cache, pass as it is
|
||||||
handler.ServeHTTP(w, r)
|
handler.ServeHTTP(w, r)
|
||||||
@ -163,7 +163,6 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}
|
gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}
|
||||||
|
|
||||||
if serviceableByResp.errorFetchingAddressFromLease {
|
if serviceableByResp.errorFetchingAddressFromLease {
|
||||||
klog.ErrorS(err, "error fetching ip and port of remote server while proxying")
|
klog.ErrorS(err, "error fetching ip and port of remote server while proxying")
|
||||||
responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r)
|
responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r)
|
||||||
@ -176,7 +175,7 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
|
|||||||
// TODO: maintain locally serviceable GVRs somewhere so that we dont have to
|
// TODO: maintain locally serviceable GVRs somewhere so that we dont have to
|
||||||
// consult the storageversion-informed map for those
|
// consult the storageversion-informed map for those
|
||||||
if len(serviceableByResp.peerEndpoints) == 0 {
|
if len(serviceableByResp.peerEndpoints) == 0 {
|
||||||
klog.Error(fmt.Sprintf("GVR %v is not served by anything in this cluster", gvr))
|
klog.Errorf("gvr %v is not served by anything in this cluster", gvr)
|
||||||
handler.ServeHTTP(w, r)
|
handler.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -189,7 +188,7 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource, localAPIServerId string, reconciler reconcilers.PeerEndpointLeaseReconciler) (serviceableByResponse, error) {
|
func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource) (serviceableByResponse, error) {
|
||||||
|
|
||||||
apiserversi, ok := h.svMap.Load(gvr)
|
apiserversi, ok := h.svMap.Load(gvr)
|
||||||
|
|
||||||
@ -202,17 +201,16 @@ func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResou
|
|||||||
var peerServerEndpoints []string
|
var peerServerEndpoints []string
|
||||||
apiservers.Range(func(key, value interface{}) bool {
|
apiservers.Range(func(key, value interface{}) bool {
|
||||||
apiserverKey := key.(string)
|
apiserverKey := key.(string)
|
||||||
if apiserverKey == localAPIServerId {
|
if apiserverKey == h.serverId {
|
||||||
response.errorFetchingAddressFromLease = true
|
|
||||||
response.locallyServiceable = true
|
response.locallyServiceable = true
|
||||||
// stop iteration
|
// stop iteration
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
hostPort, err := reconciler.GetEndpoint(apiserverKey)
|
hostPort, err := h.reconciler.GetEndpoint(apiserverKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.errorFetchingAddressFromLease = true
|
response.errorFetchingAddressFromLease = true
|
||||||
klog.Errorf("failed to get peer ip from storage lease for server %s", apiserverKey)
|
klog.ErrorS(err, "failed to get peer ip from storage lease for server", "serverID", apiserverKey)
|
||||||
// continue with iteration
|
// continue with iteration
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@ -220,12 +218,10 @@ func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResou
|
|||||||
_, _, err = net.SplitHostPort(hostPort)
|
_, _, err = net.SplitHostPort(hostPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.errorFetchingAddressFromLease = true
|
response.errorFetchingAddressFromLease = true
|
||||||
klog.Errorf("invalid address found for server %s", apiserverKey)
|
klog.ErrorS(err, "invalid address found for server", "serverID", apiserverKey)
|
||||||
// continue with iteration
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
peerServerEndpoints = append(peerServerEndpoints, hostPort)
|
peerServerEndpoints = append(peerServerEndpoints, hostPort)
|
||||||
// continue with iteration
|
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -236,7 +232,7 @@ func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResou
|
|||||||
func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) {
|
func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) {
|
||||||
user, ok := apirequest.UserFrom(req.Context())
|
user, ok := apirequest.UserFrom(req.Context())
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Errorf("failed to get user info from request")
|
klog.Error("failed to get user info from request")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -263,7 +259,7 @@ func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
|
func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
|
||||||
klog.Errorf("Error while proxying request to destination apiserver: %v", err)
|
klog.ErrorS(err, "Error while proxying request to destination apiserver")
|
||||||
http.Error(w, err.Error(), http.StatusServiceUnavailable)
|
http.Error(w, err.Error(), http.StatusServiceUnavailable)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -271,7 +267,7 @@ func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
|
|||||||
func (h *peerProxyHandler) addSV(obj interface{}) {
|
func (h *peerProxyHandler) addSV(obj interface{}) {
|
||||||
sv, ok := obj.(*v1alpha1.StorageVersion)
|
sv, ok := obj.(*v1alpha1.StorageVersion)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Errorf("Invalid StorageVersion provided to addSV()")
|
klog.Error("Invalid StorageVersion provided to addSV()")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
h.updateSVMap(nil, sv)
|
h.updateSVMap(nil, sv)
|
||||||
@ -281,12 +277,12 @@ func (h *peerProxyHandler) addSV(obj interface{}) {
|
|||||||
func (h *peerProxyHandler) updateSV(oldObj interface{}, newObj interface{}) {
|
func (h *peerProxyHandler) updateSV(oldObj interface{}, newObj interface{}) {
|
||||||
oldSV, ok := oldObj.(*v1alpha1.StorageVersion)
|
oldSV, ok := oldObj.(*v1alpha1.StorageVersion)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Errorf("Invalid StorageVersion provided to updateSV()")
|
klog.Error("Invalid StorageVersion provided to updateSV()")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
newSV, ok := newObj.(*v1alpha1.StorageVersion)
|
newSV, ok := newObj.(*v1alpha1.StorageVersion)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Errorf("Invalid StorageVersion provided to updateSV()")
|
klog.Error("Invalid StorageVersion provided to updateSV()")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
h.updateSVMap(oldSV, newSV)
|
h.updateSVMap(oldSV, newSV)
|
||||||
@ -296,7 +292,7 @@ func (h *peerProxyHandler) updateSV(oldObj interface{}, newObj interface{}) {
|
|||||||
func (h *peerProxyHandler) deleteSV(obj interface{}) {
|
func (h *peerProxyHandler) deleteSV(obj interface{}) {
|
||||||
sv, ok := obj.(*v1alpha1.StorageVersion)
|
sv, ok := obj.(*v1alpha1.StorageVersion)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Errorf("Invalid StorageVersion provided to deleteSV()")
|
klog.Error("Invalid StorageVersion provided to deleteSV()")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
h.updateSVMap(sv, nil)
|
h.updateSVMap(sv, nil)
|
||||||
|
Loading…
Reference in New Issue
Block a user