diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index d64929577f8..b5fdb06f5b9 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -443,6 +443,9 @@ func buildGenericConfig( if lastErr != nil { return } + if genericConfig.EgressSelector != nil { + storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup + } if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil { return } diff --git a/pkg/kubelet/client/BUILD b/pkg/kubelet/client/BUILD index 36b6a282919..ffb2434be55 100644 --- a/pkg/kubelet/client/BUILD +++ b/pkg/kubelet/client/BUILD @@ -16,7 +16,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/server:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/transport:go_default_library", ], diff --git a/pkg/kubelet/client/kubelet_client.go b/pkg/kubelet/client/kubelet_client.go index 3cb0cf96072..accfe763dde 100644 --- a/pkg/kubelet/client/kubelet_client.go +++ b/pkg/kubelet/client/kubelet_client.go @@ -27,7 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilnet "k8s.io/apimachinery/pkg/util/net" - "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/server/egressselector" restclient "k8s.io/client-go/rest" "k8s.io/client-go/transport" nodeutil "k8s.io/kubernetes/pkg/util/node" @@ -60,7 +60,7 @@ type KubeletClientConfig struct { Dial utilnet.DialFunc // Lookup will give us a dialer if the egress selector is configured for it - Lookup server.EgressSelectorLookup + Lookup egressselector.Lookup } // ConnectionInfo provides the information needed to connect to a kubelet @@ -88,7 +88,7 @@ func MakeTransport(config *KubeletClientConfig) (http.RoundTripper, error) { if dialer == nil && config.Lookup != nil { // Assuming EgressSelector if SSHTunnel is not turned on. // We will not get a dialer if egress selector is disabled. - networkContext := server.NetworkContext{EgressSelectionName: server.Cluster} + networkContext := egressselector.Cluster.AsNetworkContext() dialer, err = config.Lookup(networkContext) if err != nil { return nil, fmt.Errorf("failed to get context dialer for 'cluster': got %v", err) diff --git a/staging/src/k8s.io/apiserver/pkg/server/BUILD b/staging/src/k8s.io/apiserver/pkg/server/BUILD index 89b071086c9..b7d06e11fb8 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/BUILD @@ -11,7 +11,6 @@ go_test( srcs = [ "config_selfclient_test.go", "config_test.go", - "egress_selector_test.go", "genericapiserver_test.go", "healthz_test.go", ], @@ -23,11 +22,9 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/version:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", @@ -53,7 +50,6 @@ go_library( "config_selfclient.go", "deprecated_insecure_serving.go", "doc.go", - "egress_selector.go", "genericapiserver.go", "handler.go", "healthz.go", @@ -75,7 +71,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", @@ -85,7 +80,6 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver/install:go_default_library", "//staging/src/k8s.io/apiserver/pkg/audit:go_default_library", "//staging/src/k8s.io/apiserver/pkg/audit/policy:go_default_library", @@ -104,6 +98,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 77c8d122a31..71d77a7b40f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -55,6 +55,7 @@ import ( apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi" apirequest "k8s.io/apiserver/pkg/endpoints/request" genericregistry "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/server/egressselector" genericfilters "k8s.io/apiserver/pkg/server/filters" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/routes" @@ -97,7 +98,7 @@ type Config struct { // EgressSelector provides a lookup mechanism for dialing outbound connections. // It does so based on a EgressSelectorConfiguration which was read at startup. - EgressSelector *EgressSelector + EgressSelector *egressselector.EgressSelector // RuleResolver is required to get the list of rules that apply to a given user // in a given namespace diff --git a/staging/src/k8s.io/apiserver/pkg/server/egressselector/BUILD b/staging/src/k8s.io/apiserver/pkg/server/egressselector/BUILD index 8df8af8ac63..7c0e2faad7d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/egressselector/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/egressselector/BUILD @@ -2,16 +2,21 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["config.go"], + srcs = [ + "config.go", + "egress_selector.go", + ], importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/egressselector", importpath = "k8s.io/apiserver/pkg/server/egressselector", visibility = ["//visibility:public"], deps = [ "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver/install:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver/v1alpha1:go_default_library", + "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/path:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", ], @@ -33,10 +38,14 @@ filegroup( go_test( name = "go_default_test", - srcs = ["config_test.go"], + srcs = [ + "config_test.go", + "egress_selector_test.go", + ], embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/server/egress_selector.go b/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go similarity index 91% rename from staging/src/k8s.io/apiserver/pkg/server/egress_selector.go rename to staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go index 58a9dd320ad..d043e1392cd 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/egress_selector.go +++ b/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package server +package egressselector import ( "bufio" @@ -34,6 +34,7 @@ import ( var directDialer utilnet.DialFunc = http.DefaultTransport.(*http.Transport).DialContext +// EgressSelector is the map of network context type to context dialer, for network egress. type EgressSelector struct { egressToDialer map[EgressType]utilnet.DialFunc } @@ -59,9 +60,10 @@ type NetworkContext struct { EgressSelectionName EgressType } -// EgressSelectorLookup is the interface to get the dialer function for the network context. -type EgressSelectorLookup func(networkContext NetworkContext) (utilnet.DialFunc, error) +// Lookup is the interface to get the dialer function for the network context. +type Lookup func(networkContext NetworkContext) (utilnet.DialFunc, error) +// String returns the canonical string representation of the egress type func (s EgressType) String() string { switch s { case Master: @@ -75,6 +77,11 @@ func (s EgressType) String() string { } } +// AsNetworkContext is a helper function to make it easy to get the basic NetworkContext objects. +func (s EgressType) AsNetworkContext() NetworkContext { + return NetworkContext{EgressSelectionName: s} +} + func lookupServiceName(name string) (EgressType, error) { switch strings.ToLower(name) { case "master": diff --git a/staging/src/k8s.io/apiserver/pkg/server/egress_selector_test.go b/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector_test.go similarity index 99% rename from staging/src/k8s.io/apiserver/pkg/server/egress_selector_test.go rename to staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector_test.go index 825e53415d8..e24a5bdbb3b 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/egress_selector_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package server +package egressselector import ( "context" diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/egress_selector.go b/staging/src/k8s.io/apiserver/pkg/server/options/egress_selector.go index 837e32fcfaf..c5e579c3cfb 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/egress_selector.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/egress_selector.go @@ -68,7 +68,7 @@ func (o *EgressSelectorOptions) ApplyTo(c *server.Config) error { return fmt.Errorf("failed to validate egress selector configuration: %v", errs.ToAggregate()) } - cs, err := server.NewEgressSelector(npConfig) + cs, err := egressselector.NewEgressSelector(npConfig) if err != nil { return fmt.Errorf("failed to setup egress selector with config %#v: %v", npConfig, err) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/BUILD index 96bece88626..6a74f9eda4e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/BUILD @@ -12,6 +12,7 @@ go_library( importpath = "k8s.io/apiserver/pkg/storage/storagebackend", deps = [ "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go index f793e3aca55..37c65948e9a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go @@ -20,6 +20,7 @@ import ( "time" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/storage/value" ) @@ -38,6 +39,8 @@ type TransportConfig struct { KeyFile string CertFile string CAFile string + // function to determine the egress dialer. (i.e. konnectivity server dialer) + EgressLookup egressselector.Lookup } // Config is configuration for creating a storage backend. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD index 5783f0ef776..aa6d1827955 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/BUILD @@ -34,7 +34,9 @@ go_library( importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory", importpath = "k8s.io/apiserver/pkg/storage/storagebackend/factory", deps = [ + "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 627e547d035..1d01626291a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -19,6 +19,8 @@ package factory import ( "context" "fmt" + "net" + "net/url" "path" "sync" "sync/atomic" @@ -29,7 +31,9 @@ import ( grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" + utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage/storagebackend" @@ -106,17 +110,36 @@ func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error) if len(c.CertFile) == 0 && len(c.KeyFile) == 0 && len(c.CAFile) == 0 { tlsConfig = nil } + networkContext := egressselector.Etcd.AsNetworkContext() + var egressDialer utilnet.DialFunc + if c.EgressLookup != nil { + egressDialer, err = c.EgressLookup(networkContext) + if err != nil { + return nil, err + } + } + dialOptions := []grpc.DialOption{ + grpc.WithBlock(), // block until the underlying connection is up + grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor), + grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor), + } + if egressDialer != nil { + dialer := func(ctx context.Context, addr string) (net.Conn, error) { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + return egressDialer(ctx, "tcp", u.Host) + } + dialOptions = append(dialOptions, grpc.WithContextDialer(dialer)) + } cfg := clientv3.Config{ DialTimeout: dialTimeout, DialKeepAliveTime: keepaliveTime, DialKeepAliveTimeout: keepaliveTimeout, - DialOptions: []grpc.DialOption{ - grpc.WithBlock(), // block until the underlying connection is up - grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor), - grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor), - }, - Endpoints: c.ServerList, - TLS: tlsConfig, + DialOptions: dialOptions, + Endpoints: c.ServerList, + TLS: tlsConfig, } return clientv3.New(cfg)