Add support for konnectivity service to the etcd3 client.

If konnectivity service is enabled, the etcd client will now use it.
This did require moving a few methods to break circular dependencies.

Factored in feedback from lavalamp and wenjiaswe.
This commit is contained in:
Walter Fender 2019-08-27 15:58:06 -07:00
parent afa979c295
commit edbb0fa2fe
13 changed files with 69 additions and 25 deletions

View File

@ -443,6 +443,9 @@ func buildGenericConfig(
if lastErr != nil {
return
}
if genericConfig.EgressSelector != nil {
storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
}
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}

View File

@ -16,7 +16,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/transport:go_default_library",
],

View File

@ -27,7 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport"
nodeutil "k8s.io/kubernetes/pkg/util/node"
@ -60,7 +60,7 @@ type KubeletClientConfig struct {
Dial utilnet.DialFunc
// Lookup will give us a dialer if the egress selector is configured for it
Lookup server.EgressSelectorLookup
Lookup egressselector.Lookup
}
// ConnectionInfo provides the information needed to connect to a kubelet
@ -88,7 +88,7 @@ func MakeTransport(config *KubeletClientConfig) (http.RoundTripper, error) {
if dialer == nil && config.Lookup != nil {
// Assuming EgressSelector if SSHTunnel is not turned on.
// We will not get a dialer if egress selector is disabled.
networkContext := server.NetworkContext{EgressSelectionName: server.Cluster}
networkContext := egressselector.Cluster.AsNetworkContext()
dialer, err = config.Lookup(networkContext)
if err != nil {
return nil, fmt.Errorf("failed to get context dialer for 'cluster': got %v", err)

View File

@ -11,7 +11,6 @@ go_test(
srcs = [
"config_selfclient_test.go",
"config_test.go",
"egress_selector_test.go",
"genericapiserver_test.go",
"healthz_test.go",
],
@ -23,11 +22,9 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/version:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
@ -53,7 +50,6 @@ go_library(
"config_selfclient.go",
"deprecated_insecure_serving.go",
"doc.go",
"egress_selector.go",
"genericapiserver.go",
"handler.go",
"healthz.go",
@ -75,7 +71,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
@ -85,7 +80,6 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver/install:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit/policy:go_default_library",
@ -104,6 +98,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",

View File

@ -55,6 +55,7 @@ import (
apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/server/egressselector"
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/routes"
@ -97,7 +98,7 @@ type Config struct {
// EgressSelector provides a lookup mechanism for dialing outbound connections.
// It does so based on a EgressSelectorConfiguration which was read at startup.
EgressSelector *EgressSelector
EgressSelector *egressselector.EgressSelector
// RuleResolver is required to get the list of rules that apply to a given user
// in a given namespace

View File

@ -2,16 +2,21 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["config.go"],
srcs = [
"config.go",
"egress_selector.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/egressselector",
importpath = "k8s.io/apiserver/pkg/server/egressselector",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver/install:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver/v1alpha1:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/path:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
],
@ -33,10 +38,14 @@ filegroup(
go_test(
name = "go_default_test",
srcs = ["config_test.go"],
srcs = [
"config_test.go",
"egress_selector_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package server
package egressselector
import (
"bufio"
@ -34,6 +34,7 @@ import (
var directDialer utilnet.DialFunc = http.DefaultTransport.(*http.Transport).DialContext
// EgressSelector is the map of network context type to context dialer, for network egress.
type EgressSelector struct {
egressToDialer map[EgressType]utilnet.DialFunc
}
@ -59,9 +60,10 @@ type NetworkContext struct {
EgressSelectionName EgressType
}
// EgressSelectorLookup is the interface to get the dialer function for the network context.
type EgressSelectorLookup func(networkContext NetworkContext) (utilnet.DialFunc, error)
// Lookup is the interface to get the dialer function for the network context.
type Lookup func(networkContext NetworkContext) (utilnet.DialFunc, error)
// String returns the canonical string representation of the egress type
func (s EgressType) String() string {
switch s {
case Master:
@ -75,6 +77,11 @@ func (s EgressType) String() string {
}
}
// AsNetworkContext is a helper function to make it easy to get the basic NetworkContext objects.
func (s EgressType) AsNetworkContext() NetworkContext {
return NetworkContext{EgressSelectionName: s}
}
func lookupServiceName(name string) (EgressType, error) {
switch strings.ToLower(name) {
case "master":

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package server
package egressselector
import (
"context"

View File

@ -68,7 +68,7 @@ func (o *EgressSelectorOptions) ApplyTo(c *server.Config) error {
return fmt.Errorf("failed to validate egress selector configuration: %v", errs.ToAggregate())
}
cs, err := server.NewEgressSelector(npConfig)
cs, err := egressselector.NewEgressSelector(npConfig)
if err != nil {
return fmt.Errorf("failed to setup egress selector with config %#v: %v", npConfig, err)
}

View File

@ -12,6 +12,7 @@ go_library(
importpath = "k8s.io/apiserver/pkg/storage/storagebackend",
deps = [
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
],
)

View File

@ -20,6 +20,7 @@ import (
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/storage/value"
)
@ -38,6 +39,8 @@ type TransportConfig struct {
KeyFile string
CertFile string
CAFile string
// function to determine the egress dialer. (i.e. konnectivity server dialer)
EgressLookup egressselector.Lookup
}
// Config is configuration for creating a storage backend.

View File

@ -34,7 +34,9 @@ go_library(
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory",
importpath = "k8s.io/apiserver/pkg/storage/storagebackend/factory",
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",

View File

@ -19,6 +19,8 @@ package factory
import (
"context"
"fmt"
"net"
"net/url"
"path"
"sync"
"sync/atomic"
@ -29,7 +31,9 @@ import (
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/storagebackend"
@ -106,17 +110,36 @@ func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error)
if len(c.CertFile) == 0 && len(c.KeyFile) == 0 && len(c.CAFile) == 0 {
tlsConfig = nil
}
networkContext := egressselector.Etcd.AsNetworkContext()
var egressDialer utilnet.DialFunc
if c.EgressLookup != nil {
egressDialer, err = c.EgressLookup(networkContext)
if err != nil {
return nil, err
}
}
dialOptions := []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
}
if egressDialer != nil {
dialer := func(ctx context.Context, addr string) (net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
return egressDialer(ctx, "tcp", u.Host)
}
dialOptions = append(dialOptions, grpc.WithContextDialer(dialer))
}
cfg := clientv3.Config{
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
},
Endpoints: c.ServerList,
TLS: tlsConfig,
DialOptions: dialOptions,
Endpoints: c.ServerList,
TLS: tlsConfig,
}
return clientv3.New(cfg)