mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
Merge pull request #87515 from Sh4d1/proxy_agg
Use network proxy for aggregator api
This commit is contained in:
commit
4c3aa3f26b
@ -62,6 +62,7 @@ go_library(
|
||||
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/proxy:go_default_library",
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/egressselector"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
"k8s.io/client-go/pkg/version"
|
||||
openapicommon "k8s.io/kube-openapi/pkg/common"
|
||||
@ -133,6 +134,10 @@ type APIAggregator struct {
|
||||
|
||||
// openAPIAggregationController downloads and merges OpenAPI specs.
|
||||
openAPIAggregationController *openapicontroller.AggregationController
|
||||
|
||||
// egressSelector selects the proper egress dialer to communicate with the custom apiserver
|
||||
// overwrites proxyTransport dialer if not nil
|
||||
egressSelector *egressselector.EgressSelector
|
||||
}
|
||||
|
||||
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
|
||||
@ -184,6 +189,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
||||
APIRegistrationInformers: informerFactory,
|
||||
serviceResolver: c.ExtraConfig.ServiceResolver,
|
||||
openAPIConfig: openAPIConfig,
|
||||
egressSelector: c.GenericConfig.EgressSelector,
|
||||
}
|
||||
|
||||
apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
|
||||
@ -217,6 +223,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
||||
c.ExtraConfig.ProxyClientCert,
|
||||
c.ExtraConfig.ProxyClientKey,
|
||||
s.serviceResolver,
|
||||
c.GenericConfig.EgressSelector,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -301,6 +308,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
|
||||
proxyClientKey: s.proxyClientKey,
|
||||
proxyTransport: s.proxyTransport,
|
||||
serviceResolver: s.serviceResolver,
|
||||
egressSelector: s.egressSelector,
|
||||
}
|
||||
proxyHandler.updateAPIService(apiService)
|
||||
if s.openAPIAggregationController != nil {
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
genericfeatures "k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/server/egressselector"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/transport"
|
||||
@ -63,6 +64,10 @@ type proxyHandler struct {
|
||||
serviceResolver ServiceResolver
|
||||
|
||||
handlingInfo atomic.Value
|
||||
|
||||
// egressSelector selects the proper egress dialer to communicate with the custom apiserver
|
||||
// overwrites proxyTransport dialer if not nil
|
||||
egressSelector *egressselector.EgressSelector
|
||||
}
|
||||
|
||||
type proxyHandlingInfo struct {
|
||||
@ -259,7 +264,16 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIServ
|
||||
servicePort: *apiService.Spec.Service.Port,
|
||||
serviceAvailable: apiregistrationv1apihelper.IsAPIServiceConditionTrue(apiService, apiregistrationv1api.Available),
|
||||
}
|
||||
if r.proxyTransport != nil && r.proxyTransport.DialContext != nil {
|
||||
if r.egressSelector != nil {
|
||||
networkContext := egressselector.Cluster.AsNetworkContext()
|
||||
var egressDialer utilnet.DialFunc
|
||||
egressDialer, err := r.egressSelector.Lookup(networkContext)
|
||||
if err != nil {
|
||||
klog.Warning(err.Error())
|
||||
} else {
|
||||
newInfo.restConfig.Dial = egressDialer
|
||||
}
|
||||
} else if r.proxyTransport != nil && r.proxyTransport.DialContext != nil {
|
||||
newInfo.restConfig.Dial = r.proxyTransport.DialContext
|
||||
}
|
||||
newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
|
||||
|
@ -17,8 +17,10 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||
|
@ -31,8 +31,10 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/server/egressselector"
|
||||
v1informers "k8s.io/client-go/informers/core/v1"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/rest"
|
||||
@ -90,6 +92,7 @@ func NewAvailableConditionController(
|
||||
proxyClientCert []byte,
|
||||
proxyClientKey []byte,
|
||||
serviceResolver ServiceResolver,
|
||||
egressSelector *egressselector.EgressSelector,
|
||||
) (*AvailableConditionController, error) {
|
||||
c := &AvailableConditionController{
|
||||
apiServiceClient: apiServiceClient,
|
||||
@ -118,9 +121,19 @@ func NewAvailableConditionController(
|
||||
KeyData: proxyClientKey,
|
||||
},
|
||||
}
|
||||
if proxyTransport != nil && proxyTransport.DialContext != nil {
|
||||
|
||||
if egressSelector != nil {
|
||||
networkContext := egressselector.Cluster.AsNetworkContext()
|
||||
var egressDialer utilnet.DialFunc
|
||||
egressDialer, err := egressSelector.Lookup(networkContext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
restConfig.Dial = egressDialer
|
||||
} else if proxyTransport != nil && proxyTransport.DialContext != nil {
|
||||
restConfig.Dial = proxyTransport.DialContext
|
||||
}
|
||||
|
||||
transport, err := rest.TransportFor(restConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
Loading…
Reference in New Issue
Block a user