add local option to APIService

This commit is contained in:
deads2k 2017-03-08 13:55:38 -05:00
parent 7b4bec038c
commit b27de102cb
14 changed files with 199 additions and 155 deletions

View File

@ -31,7 +31,8 @@ generated_files=($(
\( \
-wholename './output' \
-o -wholename './_output' \
-o -wholename './staging' \
-o -wholename './staging/src/k8s.io/client-go' \
-o -wholename './staging/src/k8s.io/apiserver' \
-o -wholename './release' \
-o -wholename './target' \
-o -wholename '*/third_party/*' \

View File

@ -39,7 +39,9 @@ type ServiceReference struct {
type APIServiceSpec struct {
// Service is a reference to the service for this API server. It must communicate
// on port 443
Service ServiceReference
// If the Service is nil, that means the handling for the API groupversion is handled locally on this server.
// The call will simply delegate to the normal handler chain to be fulfilled.
Service *ServiceReference
// Group is the API group name this server hosts
Group string
// Version is the API version this server hosts. For example, "v1"

View File

@ -713,20 +713,26 @@ func (x *APIServiceSpec) CodecEncodeSelf(e *codec1978.Encoder) {
}
if yyr2 || yy2arr2 {
z.EncSendContainerState(codecSelfer_containerArrayElem1234)
yy4 := &x.Service
yy4.CodecEncodeSelf(e)
if x.Service == nil {
r.EncodeNil()
} else {
x.Service.CodecEncodeSelf(e)
}
} else {
z.EncSendContainerState(codecSelfer_containerMapKey1234)
r.EncodeString(codecSelferC_UTF81234, string("service"))
z.EncSendContainerState(codecSelfer_containerMapValue1234)
yy6 := &x.Service
yy6.CodecEncodeSelf(e)
if x.Service == nil {
r.EncodeNil()
} else {
x.Service.CodecEncodeSelf(e)
}
}
if yyr2 || yy2arr2 {
z.EncSendContainerState(codecSelfer_containerArrayElem1234)
if yyq2[1] {
yym9 := z.EncBinary()
_ = yym9
yym7 := z.EncBinary()
_ = yym7
if false {
} else {
r.EncodeString(codecSelferC_UTF81234, string(x.Group))
@ -739,8 +745,8 @@ func (x *APIServiceSpec) CodecEncodeSelf(e *codec1978.Encoder) {
z.EncSendContainerState(codecSelfer_containerMapKey1234)
r.EncodeString(codecSelferC_UTF81234, string("group"))
z.EncSendContainerState(codecSelfer_containerMapValue1234)
yym10 := z.EncBinary()
_ = yym10
yym8 := z.EncBinary()
_ = yym8
if false {
} else {
r.EncodeString(codecSelferC_UTF81234, string(x.Group))
@ -750,8 +756,8 @@ func (x *APIServiceSpec) CodecEncodeSelf(e *codec1978.Encoder) {
if yyr2 || yy2arr2 {
z.EncSendContainerState(codecSelfer_containerArrayElem1234)
if yyq2[2] {
yym12 := z.EncBinary()
_ = yym12
yym10 := z.EncBinary()
_ = yym10
if false {
} else {
r.EncodeString(codecSelferC_UTF81234, string(x.Version))
@ -764,8 +770,8 @@ func (x *APIServiceSpec) CodecEncodeSelf(e *codec1978.Encoder) {
z.EncSendContainerState(codecSelfer_containerMapKey1234)
r.EncodeString(codecSelferC_UTF81234, string("version"))
z.EncSendContainerState(codecSelfer_containerMapValue1234)
yym13 := z.EncBinary()
_ = yym13
yym11 := z.EncBinary()
_ = yym11
if false {
} else {
r.EncodeString(codecSelferC_UTF81234, string(x.Version))
@ -775,8 +781,8 @@ func (x *APIServiceSpec) CodecEncodeSelf(e *codec1978.Encoder) {
if yyr2 || yy2arr2 {
z.EncSendContainerState(codecSelfer_containerArrayElem1234)
if yyq2[3] {
yym15 := z.EncBinary()
_ = yym15
yym13 := z.EncBinary()
_ = yym13
if false {
} else {
r.EncodeBool(bool(x.InsecureSkipTLSVerify))
@ -789,8 +795,8 @@ func (x *APIServiceSpec) CodecEncodeSelf(e *codec1978.Encoder) {
z.EncSendContainerState(codecSelfer_containerMapKey1234)
r.EncodeString(codecSelferC_UTF81234, string("insecureSkipTLSVerify"))
z.EncSendContainerState(codecSelfer_containerMapValue1234)
yym16 := z.EncBinary()
_ = yym16
yym14 := z.EncBinary()
_ = yym14
if false {
} else {
r.EncodeBool(bool(x.InsecureSkipTLSVerify))
@ -802,8 +808,8 @@ func (x *APIServiceSpec) CodecEncodeSelf(e *codec1978.Encoder) {
if x.CABundle == nil {
r.EncodeNil()
} else {
yym18 := z.EncBinary()
_ = yym18
yym16 := z.EncBinary()
_ = yym16
if false {
} else {
r.EncodeStringBytes(codecSelferC_RAW1234, []byte(x.CABundle))
@ -816,8 +822,8 @@ func (x *APIServiceSpec) CodecEncodeSelf(e *codec1978.Encoder) {
if x.CABundle == nil {
r.EncodeNil()
} else {
yym19 := z.EncBinary()
_ = yym19
yym17 := z.EncBinary()
_ = yym17
if false {
} else {
r.EncodeStringBytes(codecSelferC_RAW1234, []byte(x.CABundle))
@ -826,8 +832,8 @@ func (x *APIServiceSpec) CodecEncodeSelf(e *codec1978.Encoder) {
}
if yyr2 || yy2arr2 {
z.EncSendContainerState(codecSelfer_containerArrayElem1234)
yym21 := z.EncBinary()
_ = yym21
yym19 := z.EncBinary()
_ = yym19
if false {
} else {
r.EncodeInt(int64(x.Priority))
@ -836,8 +842,8 @@ func (x *APIServiceSpec) CodecEncodeSelf(e *codec1978.Encoder) {
z.EncSendContainerState(codecSelfer_containerMapKey1234)
r.EncodeString(codecSelferC_UTF81234, string("priority"))
z.EncSendContainerState(codecSelfer_containerMapValue1234)
yym22 := z.EncBinary()
_ = yym22
yym20 := z.EncBinary()
_ = yym20
if false {
} else {
r.EncodeInt(int64(x.Priority))
@ -906,10 +912,14 @@ func (x *APIServiceSpec) codecDecodeSelfFromMap(l int, d *codec1978.Decoder) {
switch yys3 {
case "service":
if r.TryDecodeAsNil() {
x.Service = ServiceReference{}
if x.Service != nil {
x.Service = nil
}
} else {
yyv4 := &x.Service
yyv4.CodecDecodeSelf(d)
if x.Service == nil {
x.Service = new(ServiceReference)
}
x.Service.CodecDecodeSelf(d)
}
case "group":
if r.TryDecodeAsNil() {
@ -997,10 +1007,14 @@ func (x *APIServiceSpec) codecDecodeSelfFromArray(l int, d *codec1978.Decoder) {
}
z.DecSendContainerState(codecSelfer_containerArrayElem1234)
if r.TryDecodeAsNil() {
x.Service = ServiceReference{}
if x.Service != nil {
x.Service = nil
}
} else {
yyv16 := &x.Service
yyv16.CodecDecodeSelf(d)
if x.Service == nil {
x.Service = new(ServiceReference)
}
x.Service.CodecDecodeSelf(d)
}
yyj15++
if yyhl15 {
@ -1679,7 +1693,7 @@ func (x codecSelfer1234) decSliceAPIService(v *[]APIService, d *codec1978.Decode
yyrg1 := len(yyv1) > 0
yyv21 := yyv1
yyrl1, yyrt1 = z.DecInferLen(yyl1, z.DecBasicHandle().MaxInitLen, 368)
yyrl1, yyrt1 = z.DecInferLen(yyl1, z.DecBasicHandle().MaxInitLen, 344)
if yyrt1 {
if yyrl1 <= cap(yyv1) {
yyv1 = yyv1[:yyrl1]

View File

@ -39,7 +39,9 @@ type ServiceReference struct {
type APIServiceSpec struct {
// Service is a reference to the service for this API server. It must communicate
// on port 443
Service ServiceReference `json:"service" protobuf:"bytes,1,opt,name=service"`
// If the Service is nil, that means the handling for the API groupversion is handled locally on this server.
// The call will simply delegate to the normal handler chain to be fulfilled.
Service *ServiceReference `json:"service" protobuf:"bytes,1,opt,name=service"`
// Group is the API group name this server hosts
Group string `json:"group,omitempty" protobuf:"bytes,2,opt,name=group"`
// Version is the API version this server hosts. For example, "v1"

View File

@ -99,9 +99,7 @@ func Convert_apiregistration_APIServiceList_To_v1alpha1_APIServiceList(in *apire
}
func autoConvert_v1alpha1_APIServiceSpec_To_apiregistration_APIServiceSpec(in *APIServiceSpec, out *apiregistration.APIServiceSpec, s conversion.Scope) error {
if err := Convert_v1alpha1_ServiceReference_To_apiregistration_ServiceReference(&in.Service, &out.Service, s); err != nil {
return err
}
out.Service = (*apiregistration.ServiceReference)(unsafe.Pointer(in.Service))
out.Group = in.Group
out.Version = in.Version
out.InsecureSkipTLSVerify = in.InsecureSkipTLSVerify
@ -115,9 +113,7 @@ func Convert_v1alpha1_APIServiceSpec_To_apiregistration_APIServiceSpec(in *APISe
}
func autoConvert_apiregistration_APIServiceSpec_To_v1alpha1_APIServiceSpec(in *apiregistration.APIServiceSpec, out *APIServiceSpec, s conversion.Scope) error {
if err := Convert_apiregistration_ServiceReference_To_v1alpha1_ServiceReference(&in.Service, &out.Service, s); err != nil {
return err
}
out.Service = (*ServiceReference)(unsafe.Pointer(in.Service))
out.Group = in.Group
out.Version = in.Version
out.InsecureSkipTLSVerify = in.InsecureSkipTLSVerify

View File

@ -83,6 +83,11 @@ func DeepCopy_v1alpha1_APIServiceSpec(in interface{}, out interface{}, c *conver
in := in.(*APIServiceSpec)
out := out.(*APIServiceSpec)
*out = *in
if in.Service != nil {
in, out := &in.Service, &out.Service
*out = new(ServiceReference)
**out = **in
}
if in.CABundle != nil {
in, out := &in.CABundle, &out.CABundle
*out = make([]byte, len(*in))

View File

@ -27,10 +27,10 @@ import (
discoveryapi "k8s.io/kube-aggregator/pkg/apis/apiregistration"
)
func ValidateAPIService(apiServer *discoveryapi.APIService) field.ErrorList {
requiredName := apiServer.Spec.Version + "." + apiServer.Spec.Group
func ValidateAPIService(apiService *discoveryapi.APIService) field.ErrorList {
requiredName := apiService.Spec.Version + "." + apiService.Spec.Group
allErrs := validation.ValidateObjectMeta(&apiServer.ObjectMeta, false,
allErrs := validation.ValidateObjectMeta(&apiService.ObjectMeta, false,
func(name string, prefix bool) []string {
if minimalFailures := path.IsValidPathSegmentName(name); len(minimalFailures) > 0 {
return minimalFailures
@ -45,33 +45,42 @@ func ValidateAPIService(apiServer *discoveryapi.APIService) field.ErrorList {
field.NewPath("metadata"))
// in this case we allow empty group
if len(apiServer.Spec.Group) == 0 && apiServer.Spec.Version != "v1" {
if len(apiService.Spec.Group) == 0 && apiService.Spec.Version != "v1" {
allErrs = append(allErrs, field.Required(field.NewPath("spec", "group"), "only v1 may have an empty group and it better be legacy kube"))
}
if len(apiServer.Spec.Group) > 0 {
for _, errString := range utilvalidation.IsDNS1123Subdomain(apiServer.Spec.Group) {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "group"), apiServer.Spec.Group, errString))
if len(apiService.Spec.Group) > 0 {
for _, errString := range utilvalidation.IsDNS1123Subdomain(apiService.Spec.Group) {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "group"), apiService.Spec.Group, errString))
}
}
for _, errString := range utilvalidation.IsDNS1035Label(apiServer.Spec.Version) {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "version"), apiServer.Spec.Version, errString))
for _, errString := range utilvalidation.IsDNS1035Label(apiService.Spec.Version) {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "version"), apiService.Spec.Version, errString))
}
if apiServer.Spec.Priority <= 0 || apiServer.Spec.Priority > 1000 {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "priority"), apiServer.Spec.Priority, "priority must be positive and less than 1000"))
if apiService.Spec.Priority <= 0 || apiService.Spec.Priority > 1000 {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "priority"), apiService.Spec.Priority, "priority must be positive and less than 1000"))
}
if len(apiServer.Spec.Service.Namespace) == 0 {
if apiService.Spec.Service == nil {
if len(apiService.Spec.CABundle) != 0 {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "caBundle"), fmt.Sprintf("%d bytes", len(apiService.Spec.CABundle)), "local APIServices may not have a caBundle"))
}
if apiService.Spec.InsecureSkipTLSVerify {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "insecureSkipTLSVerify"), apiService.Spec.InsecureSkipTLSVerify, "local APIServices may not have insecureSkipTLSVerify"))
}
return allErrs
}
if len(apiService.Spec.Service.Namespace) == 0 {
allErrs = append(allErrs, field.Required(field.NewPath("spec", "service", "namespace"), ""))
}
if len(apiServer.Spec.Service.Name) == 0 {
if len(apiService.Spec.Service.Name) == 0 {
allErrs = append(allErrs, field.Required(field.NewPath("spec", "service", "name"), ""))
}
if apiServer.Spec.InsecureSkipTLSVerify && len(apiServer.Spec.CABundle) > 0 {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "insecureSkipTLSVerify"), apiServer.Spec.InsecureSkipTLSVerify, "may not be true if caBundle is present"))
if apiService.Spec.InsecureSkipTLSVerify && len(apiService.Spec.CABundle) > 0 {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "insecureSkipTLSVerify"), apiService.Spec.InsecureSkipTLSVerify, "may not be true if caBundle is present"))
}
return allErrs

View File

@ -83,6 +83,11 @@ func DeepCopy_apiregistration_APIServiceSpec(in interface{}, out interface{}, c
in := in.(*APIServiceSpec)
out := out.(*APIServiceSpec)
*out = *in
if in.Service != nil {
in, out := &in.Service, &out.Service
*out = new(ServiceReference)
**out = **in
}
if in.CABundle != nil {
in, out := &in.CABundle, &out.CABundle
*out = make([]byte, len(*in))

View File

@ -62,6 +62,8 @@ type APIAggregator struct {
contextMapper genericapirequest.RequestContextMapper
handlerChainConfig *handlerChainConfig
// proxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use
// this to confirm the proxy's identity
proxyClientCert []byte
@ -114,13 +116,14 @@ func (c completedConfig) New(stopCh <-chan struct{}) (*APIAggregator, error) {
proxyMux := http.NewServeMux()
// most API servers don't need to do this, but we need a custom handler chain to handle the special /apis handling here
c.Config.GenericConfig.BuildHandlerChainsFunc = (&handlerChainConfig{
handlerChainConfig := &handlerChainConfig{
informers: informerFactory,
proxyMux: proxyMux,
serviceLister: kubeInformers.Core().V1().Services().Lister(),
endpointsLister: kubeInformers.Core().V1().Endpoints().Lister(),
}).handlerChain
}
// most API servers don't need to do this, but we need a custom handler chain to handle the special /apis handling here
c.Config.GenericConfig.BuildHandlerChainsFunc = handlerChainConfig.handlerChain
genericServer, err := c.Config.GenericConfig.SkipComplete().New() // completion is done in Complete, no need for a second time
if err != nil {
@ -128,16 +131,17 @@ func (c completedConfig) New(stopCh <-chan struct{}) (*APIAggregator, error) {
}
s := &APIAggregator{
GenericAPIServer: genericServer,
contextMapper: c.GenericConfig.RequestContextMapper,
proxyClientCert: c.ProxyClientCert,
proxyClientKey: c.ProxyClientKey,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
serviceLister: kubeInformers.Core().V1().Services().Lister(),
endpointsLister: kubeInformers.Core().V1().Endpoints().Lister(),
proxyMux: proxyMux,
GenericAPIServer: genericServer,
contextMapper: c.GenericConfig.RequestContextMapper,
handlerChainConfig: handlerChainConfig,
proxyClientCert: c.ProxyClientCert,
proxyClientKey: c.ProxyClientKey,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
serviceLister: kubeInformers.Core().V1().Services().Lister(),
endpointsLister: kubeInformers.Core().V1().Endpoints().Lister(),
proxyMux: proxyMux,
}
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, api.Registry, api.Scheme, api.ParameterCodec, api.Codecs)
@ -171,6 +175,11 @@ type handlerChainConfig struct {
proxyMux *http.ServeMux
serviceLister v1listers.ServiceLister
endpointsLister v1listers.EndpointsLister
// fallThroughHandler keeps track of the handler that comes after the proxy so that it can be used
// to satisfy delegation cases. It is set as a side-effect while building the handler chain.
// TODO refactor to run the proxy *after* authorization so we can simply run at the end of the handling chain
fallThroughHandler http.Handler
}
// handlerChain is a method to build the handler chain for this API server. We need a custom handler chain so that we
@ -181,6 +190,7 @@ func (h *handlerChainConfig) handlerChain(apiHandler http.Handler, c *genericapi
handler := WithAPIs(apiHandler, h.informers.Apiregistration().InternalVersion().APIServices(), h.serviceLister, h.endpointsLister)
handler = genericapifilters.WithAuthorization(handler, c.RequestContextMapper, c.Authorizer)
h.fallThroughHandler = handler
// this mux is NOT protected by authorization, but DOES have authentication information
// this is so that everyone can hit the proxy and we can properly identify the user. The backing
@ -220,11 +230,10 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
// register the proxy handler
proxyHandler := &proxyHandler{
contextMapper: s.contextMapper,
proxyClientCert: s.proxyClientCert,
proxyClientKey: s.proxyClientKey,
transportBuildingError: nil,
proxyRoundTripper: nil,
contextMapper: s.contextMapper,
localDelegate: s.handlerChainConfig.fallThroughHandler,
proxyClientCert: s.proxyClientCert,
proxyClientKey: s.proxyClientKey,
}
proxyHandler.updateAPIService(apiService)
s.proxyHandlers[apiService.Name] = proxyHandler

View File

@ -125,25 +125,27 @@ func convertToDiscoveryAPIGroup(apiServices []*apiregistrationapi.APIService, se
var discoveryGroup *metav1.APIGroup
for _, apiService := range apiServicesByGroup {
// skip any API services without actual services
if _, err := serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name); err != nil {
continue
}
hasActiveEndpoints := false
endpoints, err := endpointsLister.Endpoints(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
// skip any API services without endpoints
if err != nil {
continue
}
for _, subset := range endpoints.Subsets {
if len(subset.Addresses) > 0 {
hasActiveEndpoints = true
break
if apiService.Spec.Service != nil {
// skip any API services without actual services
if _, err := serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name); err != nil {
continue
}
hasActiveEndpoints := false
endpoints, err := endpointsLister.Endpoints(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
// skip any API services without endpoints
if err != nil {
continue
}
for _, subset := range endpoints.Subsets {
if len(subset.Addresses) > 0 {
hasActiveEndpoints = true
break
}
}
if !hasActiveEndpoints {
continue
}
}
if !hasActiveEndpoints {
continue
}
// the first APIService which is valid becomes the default

View File

@ -115,7 +115,7 @@ func TestAPIs(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
Spec: apiregistration.APIServiceSpec{
Service: apiregistration.ServiceReference{
Service: &apiregistration.ServiceReference{
Namespace: "ns",
Name: "api",
},
@ -127,7 +127,7 @@ func TestAPIs(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{Name: "v1.bar"},
Spec: apiregistration.APIServiceSpec{
Service: apiregistration.ServiceReference{
Service: &apiregistration.ServiceReference{
Namespace: "ns",
Name: "api",
},
@ -176,7 +176,7 @@ func TestAPIs(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
Spec: apiregistration.APIServiceSpec{
Service: apiregistration.ServiceReference{
Service: &apiregistration.ServiceReference{
Namespace: "ns",
Name: "api",
},
@ -188,7 +188,7 @@ func TestAPIs(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{Name: "v2.bar"},
Spec: apiregistration.APIServiceSpec{
Service: apiregistration.ServiceReference{
Service: &apiregistration.ServiceReference{
Namespace: "ns",
Name: "api",
},
@ -200,7 +200,7 @@ func TestAPIs(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{Name: "v2.foo"},
Spec: apiregistration.APIServiceSpec{
Service: apiregistration.ServiceReference{
Service: &apiregistration.ServiceReference{
Namespace: "ns",
Name: "api",
},
@ -212,7 +212,7 @@ func TestAPIs(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{Name: "v1.bar"},
Spec: apiregistration.APIServiceSpec{
Service: apiregistration.ServiceReference{
Service: &apiregistration.ServiceReference{
Namespace: "ns",
Name: "api",
},
@ -356,7 +356,7 @@ func TestAPIGroup(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
Spec: apiregistration.APIServiceSpec{
Service: apiregistration.ServiceReference{
Service: &apiregistration.ServiceReference{
Namespace: "ns",
Name: "api",
},
@ -368,7 +368,7 @@ func TestAPIGroup(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{Name: "v2.bar"},
Spec: apiregistration.APIServiceSpec{
Service: apiregistration.ServiceReference{
Service: &apiregistration.ServiceReference{
Namespace: "ns",
Name: "api",
},
@ -380,7 +380,7 @@ func TestAPIGroup(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{Name: "v2.foo"},
Spec: apiregistration.APIServiceSpec{
Service: apiregistration.ServiceReference{
Service: &apiregistration.ServiceReference{
Namespace: "ns",
Name: "api",
},
@ -392,7 +392,7 @@ func TestAPIGroup(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{Name: "v1.bar"},
Spec: apiregistration.APIServiceSpec{
Service: apiregistration.ServiceReference{
Service: &apiregistration.ServiceReference{
Namespace: "ns",
Name: "api",
},

View File

@ -19,7 +19,7 @@ package apiserver
import (
"net/http"
"net/url"
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
@ -37,13 +37,21 @@ import (
type proxyHandler struct {
contextMapper genericapirequest.RequestContextMapper
// localDelegate is used to satisfy local APIServices
localDelegate http.Handler
// proxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use
// this to confirm the proxy's identity
proxyClientCert []byte
proxyClientKey []byte
// lock protects us for updates.
lock sync.RWMutex
handlingInfo atomic.Value
}
type proxyHandlingInfo struct {
// local indicates that this APIService is locally satisfied
local bool
// restConfig holds the information for building a roundtripper
restConfig *restclient.Config
// transportBuildingError is an error produced while building the transport. If this
@ -56,11 +64,17 @@ type proxyHandler struct {
}
func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
proxyRoundTripper, err := r.getRoundTripper()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
handlingInfo := r.handlingInfo.Load().(proxyHandlingInfo)
if handlingInfo.local {
r.localDelegate.ServeHTTP(w, req)
return
}
if handlingInfo.transportBuildingError != nil {
http.Error(w, handlingInfo.transportBuildingError.Error(), http.StatusInternalServerError)
return
}
proxyRoundTripper := handlingInfo.proxyRoundTripper
if proxyRoundTripper == nil {
http.Error(w, "", http.StatusNotFound)
return
@ -80,7 +94,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// write a new location based on the existing request pointed at the target service
location := &url.URL{}
location.Scheme = "https"
location.Host = r.getDestinationHost()
location.Host = handlingInfo.destinationHost
location.Path = req.URL.Path
location.RawQuery = req.URL.Query().Encode()
@ -98,7 +112,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
upgrade := false
// we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers
proxyRoundTripper, upgrade, err = r.maybeWrapForConnectionUpgrades(proxyRoundTripper, req)
proxyRoundTripper, upgrade, err = maybeWrapForConnectionUpgrades(handlingInfo.restConfig, proxyRoundTripper, req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@ -118,19 +132,18 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
// maybeWrapForConnectionUpgrades wraps the roundtripper for upgrades. The bool indicates if it was wrapped
func (r *proxyHandler) maybeWrapForConnectionUpgrades(rt http.RoundTripper, req *http.Request) (http.RoundTripper, bool, error) {
func maybeWrapForConnectionUpgrades(restConfig *restclient.Config, rt http.RoundTripper, req *http.Request) (http.RoundTripper, bool, error) {
connectionHeader := req.Header.Get("Connection")
if len(connectionHeader) == 0 {
return rt, false, nil
}
cfg := r.getRESTConfig()
tlsConfig, err := restclient.TLSConfigFor(cfg)
tlsConfig, err := restclient.TLSConfigFor(restConfig)
if err != nil {
return nil, true, err
}
upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig)
wrappedRT, err := restclient.HTTPWrappersForConfig(cfg, upgradeRoundTripper)
wrappedRT, err := restclient.HTTPWrappersForConfig(restConfig, upgradeRoundTripper)
if err != nil {
return nil, true, err
}
@ -164,47 +177,26 @@ func (r *responder) Error(err error) {
// these methods provide locked access to fields
func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIService) {
r.lock.Lock()
defer r.lock.Unlock()
if apiService.Spec.Service == nil {
r.handlingInfo.Store(proxyHandlingInfo{local: true})
return
}
r.transportBuildingError = nil
r.proxyRoundTripper = nil
r.destinationHost = apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc"
r.restConfig = &restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
Insecure: apiService.Spec.InsecureSkipTLSVerify,
CertData: r.proxyClientCert,
KeyData: r.proxyClientKey,
CAData: apiService.Spec.CABundle,
newInfo := proxyHandlingInfo{
destinationHost: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
restConfig: &restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
Insecure: apiService.Spec.InsecureSkipTLSVerify,
CertData: r.proxyClientCert,
KeyData: r.proxyClientKey,
CAData: apiService.Spec.CABundle,
},
},
}
r.proxyRoundTripper, r.transportBuildingError = restclient.TransportFor(r.restConfig)
newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
r.handlingInfo.Store(newInfo)
}
func (r *proxyHandler) removeAPIService() {
r.lock.Lock()
defer r.lock.Unlock()
r.transportBuildingError = nil
r.proxyRoundTripper = nil
}
func (r *proxyHandler) getRoundTripper() (http.RoundTripper, error) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.proxyRoundTripper, r.transportBuildingError
}
func (r *proxyHandler) getDestinationHost() string {
r.lock.RLock()
defer r.lock.RUnlock()
return r.destinationHost
}
func (r *proxyHandler) getRESTConfig() *restclient.Config {
r.lock.RLock()
defer r.lock.RUnlock()
return r.restConfig
r.handlingInfo.Store(proxyHandlingInfo{})
}

View File

@ -83,7 +83,9 @@ func TestProxyHandler(t *testing.T) {
targetServer := httptest.NewTLSServer(target)
defer targetServer.Close()
handler := &proxyHandler{}
handler := &proxyHandler{
localDelegate: http.NewServeMux(),
}
server := httptest.NewServer(handler)
defer server.Close()
@ -105,6 +107,7 @@ func TestProxyHandler(t *testing.T) {
apiService: &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
Spec: apiregistration.APIServiceSpec{
Service: &apiregistration.ServiceReference{},
Group: "foo",
Version: "v1",
},
@ -121,6 +124,7 @@ func TestProxyHandler(t *testing.T) {
apiService: &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
Spec: apiregistration.APIServiceSpec{
Service: &apiregistration.ServiceReference{},
Group: "foo",
Version: "v1",
InsecureSkipTLSVerify: true,
@ -146,6 +150,7 @@ func TestProxyHandler(t *testing.T) {
apiService: &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
Spec: apiregistration.APIServiceSpec{
Service: &apiregistration.ServiceReference{},
Group: "foo",
Version: "v1",
},
@ -160,7 +165,9 @@ func TestProxyHandler(t *testing.T) {
handler.removeAPIService()
if tc.apiService != nil {
handler.updateAPIService(tc.apiService)
handler.destinationHost = targetServer.Listener.Addr().String()
curr := handler.handlingInfo.Load().(proxyHandlingInfo)
curr.destinationHost = targetServer.Listener.Addr().String()
handler.handlingInfo.Store(curr)
}
resp, err := http.Get(server.URL + tc.path)

View File

@ -302,7 +302,7 @@ func TestAggregatedAPIServer(t *testing.T) {
_, err = aggregatorClient.ApiregistrationV1alpha1().APIServices().Create(&apiregistrationv1alpha1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.k8s.io"},
Spec: apiregistrationv1alpha1.APIServiceSpec{
Service: apiregistrationv1alpha1.ServiceReference{
Service: &apiregistrationv1alpha1.ServiceReference{
Namespace: "kube-wardle",
Name: "api",
},
@ -326,7 +326,7 @@ func TestAggregatedAPIServer(t *testing.T) {
_, err = aggregatorClient.ApiregistrationV1alpha1().APIServices().Create(&apiregistrationv1alpha1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
Spec: apiregistrationv1alpha1.APIServiceSpec{
Service: apiregistrationv1alpha1.ServiceReference{
Service: &apiregistrationv1alpha1.ServiceReference{
Namespace: "default",
Name: "kubernetes",
},