refactor peerproxy_handler and add unit test

This commit is contained in:
Richa Banker 2024-01-24 19:48:51 -08:00
parent 3a0f6c1ea3
commit 9c65b79ea3
2 changed files with 148 additions and 76 deletions

View File

@ -82,12 +82,6 @@ type peerProxyHandler struct {
finishedSync atomic.Bool finishedSync atomic.Bool
} }
type serviceableByResponse struct {
locallyServiceable bool
errorFetchingAddressFromLease bool
peerEndpoints []string
}
// responder implements rest.Responder for assisting a connector in writing objects or errors. // responder implements rest.Responder for assisting a connector in writing objects or errors.
type responder struct { type responder struct {
w http.ResponseWriter w http.ResponseWriter
@ -149,84 +143,97 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
gvr.Group = "core" gvr.Group = "core"
} }
// find servers that are capable of serving this request apiservers, err := h.findServiceableByServers(gvr)
serviceableByResp, err := h.findServiceableByServers(gvr)
if err != nil { if err != nil {
// this means that resource is an aggregated API or a CR since it wasn't found in SV informer cache, pass as it is // resource wasn't found in SV informer cache which means that resource is an aggregated API
handler.ServeHTTP(w, r) // or a CR. This situation is ok to be handled by local handler.
return
}
// found the gvr locally, pass request to the next handler in local apiserver
if serviceableByResp.locallyServiceable {
handler.ServeHTTP(w, r) handler.ServeHTTP(w, r)
return return
} }
gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version} locallyServiceable, peerEndpoints, err := h.resolveServingLocation(apiservers)
if serviceableByResp.errorFetchingAddressFromLease { if err != nil {
klog.ErrorS(err, "error fetching ip and port of remote server while proxying") gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}
klog.ErrorS(err, "error finding serviceable-by apiservers for the requested resource", "gvr", gvr)
responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r) responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r)
return return
} }
// no apiservers were found that could serve the request, pass request to // pass request to the next handler if found the gvr locally.
// next handler, that should eventually serve 404
// TODO: maintain locally serviceable GVRs somewhere so that we dont have to // TODO: maintain locally serviceable GVRs somewhere so that we dont have to
// consult the storageversion-informed map for those // consult the storageversion-informed map for those
if len(serviceableByResp.peerEndpoints) == 0 { if locallyServiceable {
handler.ServeHTTP(w, r)
return
}
if len(peerEndpoints) == 0 {
klog.Errorf("gvr %v is not served by anything in this cluster", gvr) klog.Errorf("gvr %v is not served by anything in this cluster", gvr)
handler.ServeHTTP(w, r) handler.ServeHTTP(w, r)
return return
} }
// otherwise, randomly select an apiserver and proxy request to it // otherwise, randomly select an apiserver and proxy request to it
rand := rand.Intn(len(serviceableByResp.peerEndpoints)) rand := rand.Intn(len(peerEndpoints))
destServerHostPort := serviceableByResp.peerEndpoints[rand] destServerHostPort := peerEndpoints[rand]
h.proxyRequestToDestinationAPIServer(r, w, destServerHostPort) h.proxyRequestToDestinationAPIServer(r, w, destServerHostPort)
}) })
} }
func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource) (serviceableByResponse, error) { func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource) (*sync.Map, error) {
apiserversi, ok := h.svMap.Load(gvr) apiserversi, ok := h.svMap.Load(gvr)
// no value found for the requested gvr in svMap
if !ok || apiserversi == nil { if !ok || apiserversi == nil {
return serviceableByResponse{}, fmt.Errorf("no StorageVersions found for the GVR: %v", gvr) return nil, fmt.Errorf("no storageVersions found for the GVR: %v", gvr)
} }
apiservers := apiserversi.(*sync.Map)
response := serviceableByResponse{} apiservers, _ := apiserversi.(*sync.Map)
return apiservers, nil
}
func (h *peerProxyHandler) resolveServingLocation(apiservers *sync.Map) (bool, []string, error) {
var peerServerEndpoints []string var peerServerEndpoints []string
var locallyServiceable bool
var respErr error
apiservers.Range(func(key, value interface{}) bool { apiservers.Range(func(key, value interface{}) bool {
apiserverKey := key.(string) apiserverKey := key.(string)
if apiserverKey == h.serverId { if apiserverKey == h.serverId {
response.locallyServiceable = true locallyServiceable = true
// stop iteration // stop iteration
return false return false
} }
hostPort, err := h.reconciler.GetEndpoint(apiserverKey) hostPort, err := h.hostportInfo(apiserverKey)
if err != nil { if err != nil {
response.errorFetchingAddressFromLease = true respErr = err
klog.ErrorS(err, "failed to get peer ip from storage lease for server", "serverID", apiserverKey)
// continue with iteration // continue with iteration
return true return true
} }
// check ip format
_, _, err = net.SplitHostPort(hostPort)
if err != nil {
response.errorFetchingAddressFromLease = true
klog.ErrorS(err, "invalid address found for server", "serverID", apiserverKey)
return true
}
peerServerEndpoints = append(peerServerEndpoints, hostPort) peerServerEndpoints = append(peerServerEndpoints, hostPort)
return true return true
}) })
response.peerEndpoints = peerServerEndpoints // reset err if there was atleast one valid peer server found.
return response, nil if len(peerServerEndpoints) > 0 {
respErr = nil
}
return locallyServiceable, peerServerEndpoints, respErr
}
func (h *peerProxyHandler) hostportInfo(apiserverKey string) (string, error) {
hostport, err := h.reconciler.GetEndpoint(apiserverKey)
if err != nil {
return "", err
}
// check ip format
_, _, err = net.SplitHostPort(hostport)
if err != nil {
return "", err
}
return hostport, nil
} }
func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) { func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) {
@ -248,13 +255,11 @@ func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request,
defer cancelFn() defer cancelFn()
proxyRoundTripper := transport.NewAuthProxyRoundTripper(user.GetName(), user.GetUID(), user.GetGroups(), user.GetExtra(), h.proxyTransport) proxyRoundTripper := transport.NewAuthProxyRoundTripper(user.GetName(), user.GetUID(), user.GetGroups(), user.GetExtra(), h.proxyTransport)
delegate := &epmetrics.ResponseWriterDelegator{ResponseWriter: rw} delegate := &epmetrics.ResponseWriterDelegator{ResponseWriter: rw}
w := responsewriter.WrapForHTTP1Or2(delegate) w := responsewriter.WrapForHTTP1Or2(delegate)
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, false, &responder{w: w, ctx: req.Context()}) handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, false, &responder{w: w, ctx: req.Context()})
handler.ServeHTTP(w, newReq) handler.ServeHTTP(w, newReq)
// Increment the count of proxied requests
metrics.IncPeerProxiedRequest(req.Context(), strconv.Itoa(delegate.Status())) metrics.IncPeerProxiedRequest(req.Context(), strconv.Itoa(delegate.Status()))
} }
@ -280,11 +285,13 @@ func (h *peerProxyHandler) updateSV(oldObj interface{}, newObj interface{}) {
klog.Error("Invalid StorageVersion provided to updateSV()") klog.Error("Invalid StorageVersion provided to updateSV()")
return return
} }
newSV, ok := newObj.(*v1alpha1.StorageVersion) newSV, ok := newObj.(*v1alpha1.StorageVersion)
if !ok { if !ok {
klog.Error("Invalid StorageVersion provided to updateSV()") klog.Error("Invalid StorageVersion provided to updateSV()")
return return
} }
h.updateSVMap(oldSV, newSV) h.updateSVMap(oldSV, newSV)
} }
@ -295,17 +302,17 @@ func (h *peerProxyHandler) deleteSV(obj interface{}) {
klog.Error("Invalid StorageVersion provided to deleteSV()") klog.Error("Invalid StorageVersion provided to deleteSV()")
return return
} }
h.updateSVMap(sv, nil) h.updateSVMap(sv, nil)
} }
// Delete old storageversion, add new storagversion // Delete old storageversion, add new storagversion
func (h *peerProxyHandler) updateSVMap(oldSV *v1alpha1.StorageVersion, newSV *v1alpha1.StorageVersion) { func (h *peerProxyHandler) updateSVMap(oldSV *v1alpha1.StorageVersion, newSV *v1alpha1.StorageVersion) {
if oldSV != nil { if oldSV != nil {
// delete old SV entries
h.deleteSVFromMap(oldSV) h.deleteSVFromMap(oldSV)
} }
if newSV != nil { if newSV != nil {
// add new SV entries
h.addSVToMap(newSV) h.addSVToMap(newSV)
} }
} }

View File

@ -54,19 +54,23 @@ import (
const ( const (
requestTimeout = 30 * time.Second requestTimeout = 30 * time.Second
localServerId = "local-apiserver" localServerID = "local-apiserver"
remoteServerId = "remote-apiserver" remoteServerID = "remote-apiserver"
) )
type FakeSVMapData struct { type FakeSVMapData struct {
gvr schema.GroupVersionResource gvr schema.GroupVersionResource
serverId string serverIDs []string
}
type server struct {
publicIP string
serverID string
} }
type reconciler struct { type reconciler struct {
do bool do bool
publicIP string servers []server
serverId string
} }
func TestPeerProxy(t *testing.T) { func TestPeerProxy(t *testing.T) {
@ -116,7 +120,7 @@ func TestPeerProxy(t *testing.T) {
Group: "core", Group: "core",
Version: "bar", Version: "bar",
Resource: "baz"}, Resource: "baz"},
serverId: ""}, serverIDs: []string{}},
}, },
{ {
desc: "503 if no endpoint fetched from lease", desc: "503 if no endpoint fetched from lease",
@ -128,7 +132,7 @@ func TestPeerProxy(t *testing.T) {
Group: "core", Group: "core",
Version: "foo", Version: "foo",
Resource: "bar"}, Resource: "bar"},
serverId: remoteServerId}, serverIDs: []string{remoteServerID}},
}, },
{ {
desc: "200 if locally serviceable", desc: "200 if locally serviceable",
@ -140,7 +144,7 @@ func TestPeerProxy(t *testing.T) {
Group: "core", Group: "core",
Version: "foo", Version: "foo",
Resource: "bar"}, Resource: "bar"},
serverId: localServerId}, serverIDs: []string{localServerID}},
}, },
{ {
desc: "503 unreachable peer bind address", desc: "503 unreachable peer bind address",
@ -152,11 +156,15 @@ func TestPeerProxy(t *testing.T) {
Group: "core", Group: "core",
Version: "foo", Version: "foo",
Resource: "bar"}, Resource: "bar"},
serverId: remoteServerId}, serverIDs: []string{remoteServerID}},
reconcilerConfig: reconciler{ reconcilerConfig: reconciler{
do: true, do: true,
publicIP: "1.2.3.4", servers: []server{
serverId: remoteServerId, {
publicIP: "1.2.3.4",
serverID: remoteServerID,
},
},
}, },
metrics: []string{ metrics: []string{
"apiserver_rerouted_request_total", "apiserver_rerouted_request_total",
@ -177,11 +185,15 @@ func TestPeerProxy(t *testing.T) {
Group: "core", Group: "core",
Version: "foo", Version: "foo",
Resource: "bar"}, Resource: "bar"},
serverId: remoteServerId}, serverIDs: []string{remoteServerID}},
reconcilerConfig: reconciler{ reconcilerConfig: reconciler{
do: true, do: true,
publicIP: "1.2.3.4", servers: []server{
serverId: remoteServerId, {
publicIP: "1.2.3.4",
serverID: remoteServerID,
},
},
}, },
metrics: []string{ metrics: []string{
"apiserver_rerouted_request_total", "apiserver_rerouted_request_total",
@ -192,6 +204,52 @@ func TestPeerProxy(t *testing.T) {
apiserver_rerouted_request_total{code="503"} 2 apiserver_rerouted_request_total{code="503"} 2
`, `,
}, },
{
desc: "503 if one apiserver's endpoint lease wasnt found but another valid (unreachable) apiserver was found",
requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "foo",
Resource: "bar"},
serverIDs: []string{"aggregated-apiserver", remoteServerID}},
reconcilerConfig: reconciler{
do: true,
servers: []server{
{
publicIP: "1.2.3.4",
serverID: remoteServerID,
},
},
},
},
{
desc: "503 if all peers had invalid host:port info",
requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "foo",
Resource: "bar"},
serverIDs: []string{"aggregated-apiserver", remoteServerID}},
reconcilerConfig: reconciler{
do: true,
servers: []server{
{
publicIP: "1[2.4",
serverID: "aggregated-apiserver",
},
{
publicIP: "2.4]6",
serverID: remoteServerID,
},
},
},
},
} }
metrics.Register() metrics.Register()
@ -210,10 +268,15 @@ func TestPeerProxy(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)
reconciler.UpdateLease(tt.reconcilerConfig.serverId, for _, server := range tt.reconcilerConfig.servers {
tt.reconcilerConfig.publicIP, err := reconciler.UpdateLease(server.serverID,
[]corev1.EndpointPort{{Name: "foo", server.publicIP,
Port: 8080, Protocol: "TCP"}}) []corev1.EndpointPort{{Name: "foo",
Port: 8080, Protocol: "TCP"}})
if err != nil {
t.Fatalf("failed to update peer endpoint lease - %v", err)
}
}
} }
req, err := http.NewRequest(http.MethodGet, server.URL+tt.requestPath, nil) req, err := http.NewRequest(http.MethodGet, server.URL+tt.requestPath, nil)
@ -261,7 +324,7 @@ func newFakePeerEndpointReconciler(t *testing.T) reconcilers.PeerEndpointLeaseRe
func newHandlerChain(t *testing.T, handler http.Handler, reconciler reconcilers.PeerEndpointLeaseReconciler, informerFinishedSync bool, svdata FakeSVMapData) http.Handler { func newHandlerChain(t *testing.T, handler http.Handler, reconciler reconcilers.PeerEndpointLeaseReconciler, informerFinishedSync bool, svdata FakeSVMapData) http.Handler {
// Add peerproxy handler // Add peerproxy handler
s := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion() s := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion()
peerProxyHandler, err := newFakePeerProxyHandler(informerFinishedSync, reconciler, svdata, localServerId, s) peerProxyHandler, err := newFakePeerProxyHandler(reconciler, svdata, localServerID, s)
if err != nil { if err != nil {
t.Fatalf("Error creating peer proxy handler: %v", err) t.Fatalf("Error creating peer proxy handler: %v", err)
} }
@ -277,7 +340,7 @@ func newHandlerChain(t *testing.T, handler http.Handler, reconciler reconcilers.
return handler return handler
} }
func newFakePeerProxyHandler(informerFinishedSync bool, reconciler reconcilers.PeerEndpointLeaseReconciler, svdata FakeSVMapData, id string, s runtime.NegotiatedSerializer) (*peerProxyHandler, error) { func newFakePeerProxyHandler(reconciler reconcilers.PeerEndpointLeaseReconciler, svdata FakeSVMapData, id string, s runtime.NegotiatedSerializer) (*peerProxyHandler, error) {
clientset := fake.NewSimpleClientset() clientset := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(clientset, 0) informerFactory := informers.NewSharedInformerFactory(clientset, 0)
clientConfig := &transport.Config{ clientConfig := &transport.Config{
@ -290,16 +353,18 @@ func newFakePeerProxyHandler(informerFinishedSync bool, reconciler reconcilers.P
} }
ppI := NewPeerProxyHandler(informerFactory, storageversion.NewDefaultManager(), proxyRoundTripper, id, reconciler, s) ppI := NewPeerProxyHandler(informerFactory, storageversion.NewDefaultManager(), proxyRoundTripper, id, reconciler, s)
if testDataExists(svdata.gvr) { if testDataExists(svdata.gvr) {
ppI.addToStorageVersionMap(svdata.gvr, svdata.serverId) ppI.addToStorageVersionMap(svdata.gvr, svdata.serverIDs)
} }
return ppI, nil return ppI, nil
} }
func (h *peerProxyHandler) addToStorageVersionMap(gvr schema.GroupVersionResource, serverId string) { func (h *peerProxyHandler) addToStorageVersionMap(gvr schema.GroupVersionResource, serverIDs []string) {
apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{}) apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{})
apiservers := apiserversi.(*sync.Map) apiservers := apiserversi.(*sync.Map)
if serverId != "" { for _, serverID := range serverIDs {
apiservers.Store(serverId, true) if serverID != "" {
apiservers.Store(serverID, true)
}
} }
} }