Merge pull request #82048 from cheftako/kas-np4

Add support for konnectivity service to the etcd3 client.
This commit is contained in:
Kubernetes Prow Robot 2019-08-30 16:15:28 -07:00 committed by GitHub
commit c86da8e2c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 69 additions and 25 deletions

View File

@ -443,6 +443,9 @@ func buildGenericConfig(
if lastErr != nil { if lastErr != nil {
return return
} }
if genericConfig.EgressSelector != nil {
storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
}
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil { if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return return
} }

View File

@ -16,7 +16,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/transport:go_default_library", "//staging/src/k8s.io/client-go/transport:go_default_library",
], ],

View File

@ -27,7 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/egressselector"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport" "k8s.io/client-go/transport"
nodeutil "k8s.io/kubernetes/pkg/util/node" nodeutil "k8s.io/kubernetes/pkg/util/node"
@ -60,7 +60,7 @@ type KubeletClientConfig struct {
Dial utilnet.DialFunc Dial utilnet.DialFunc
// Lookup will give us a dialer if the egress selector is configured for it // Lookup will give us a dialer if the egress selector is configured for it
Lookup server.EgressSelectorLookup Lookup egressselector.Lookup
} }
// ConnectionInfo provides the information needed to connect to a kubelet // ConnectionInfo provides the information needed to connect to a kubelet
@ -88,7 +88,7 @@ func MakeTransport(config *KubeletClientConfig) (http.RoundTripper, error) {
if dialer == nil && config.Lookup != nil { if dialer == nil && config.Lookup != nil {
// Assuming EgressSelector if SSHTunnel is not turned on. // Assuming EgressSelector if SSHTunnel is not turned on.
// We will not get a dialer if egress selector is disabled. // We will not get a dialer if egress selector is disabled.
networkContext := server.NetworkContext{EgressSelectionName: server.Cluster} networkContext := egressselector.Cluster.AsNetworkContext()
dialer, err = config.Lookup(networkContext) dialer, err = config.Lookup(networkContext)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get context dialer for 'cluster': got %v", err) return nil, fmt.Errorf("failed to get context dialer for 'cluster': got %v", err)

View File

@ -11,7 +11,6 @@ go_test(
srcs = [ srcs = [
"config_selfclient_test.go", "config_selfclient_test.go",
"config_test.go", "config_test.go",
"egress_selector_test.go",
"genericapiserver_test.go", "genericapiserver_test.go",
"healthz_test.go", "healthz_test.go",
], ],
@ -23,11 +22,9 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/version:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
@ -53,7 +50,6 @@ go_library(
"config_selfclient.go", "config_selfclient.go",
"deprecated_insecure_serving.go", "deprecated_insecure_serving.go",
"doc.go", "doc.go",
"egress_selector.go",
"genericapiserver.go", "genericapiserver.go",
"handler.go", "handler.go",
"healthz.go", "healthz.go",
@ -75,7 +71,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
@ -85,7 +80,6 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/validating:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver/install:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver/install:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit:go_default_library", "//staging/src/k8s.io/apiserver/pkg/audit:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/audit/policy:go_default_library", "//staging/src/k8s.io/apiserver/pkg/audit/policy:go_default_library",
@ -104,6 +98,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/filters:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",

View File

@ -55,6 +55,7 @@ import (
apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi" apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi"
apirequest "k8s.io/apiserver/pkg/endpoints/request" apirequest "k8s.io/apiserver/pkg/endpoints/request"
genericregistry "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/server/egressselector"
genericfilters "k8s.io/apiserver/pkg/server/filters" genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/routes" "k8s.io/apiserver/pkg/server/routes"
@ -97,7 +98,7 @@ type Config struct {
// EgressSelector provides a lookup mechanism for dialing outbound connections. // EgressSelector provides a lookup mechanism for dialing outbound connections.
// It does so based on a EgressSelectorConfiguration which was read at startup. // It does so based on a EgressSelectorConfiguration which was read at startup.
EgressSelector *EgressSelector EgressSelector *egressselector.EgressSelector
// RuleResolver is required to get the list of rules that apply to a given user // RuleResolver is required to get the list of rules that apply to a given user
// in a given namespace // in a given namespace

View File

@ -2,16 +2,21 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["config.go"], srcs = [
"config.go",
"egress_selector.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/egressselector", importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/egressselector",
importpath = "k8s.io/apiserver/pkg/server/egressselector", importpath = "k8s.io/apiserver/pkg/server/egressselector",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver/install:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver/install:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver/v1alpha1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver/v1alpha1:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/path:go_default_library", "//vendor/k8s.io/utils/path:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library",
], ],
@ -33,10 +38,14 @@ filegroup(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["config_test.go"], srcs = [
"config_test.go",
"egress_selector_test.go",
],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
], ],
) )

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package server package egressselector
import ( import (
"bufio" "bufio"
@ -34,6 +34,7 @@ import (
var directDialer utilnet.DialFunc = http.DefaultTransport.(*http.Transport).DialContext var directDialer utilnet.DialFunc = http.DefaultTransport.(*http.Transport).DialContext
// EgressSelector is the map of network context type to context dialer, for network egress.
type EgressSelector struct { type EgressSelector struct {
egressToDialer map[EgressType]utilnet.DialFunc egressToDialer map[EgressType]utilnet.DialFunc
} }
@ -59,9 +60,10 @@ type NetworkContext struct {
EgressSelectionName EgressType EgressSelectionName EgressType
} }
// EgressSelectorLookup is the interface to get the dialer function for the network context. // Lookup is the interface to get the dialer function for the network context.
type EgressSelectorLookup func(networkContext NetworkContext) (utilnet.DialFunc, error) type Lookup func(networkContext NetworkContext) (utilnet.DialFunc, error)
// String returns the canonical string representation of the egress type
func (s EgressType) String() string { func (s EgressType) String() string {
switch s { switch s {
case Master: case Master:
@ -75,6 +77,11 @@ func (s EgressType) String() string {
} }
} }
// AsNetworkContext is a helper function to make it easy to get the basic NetworkContext objects.
func (s EgressType) AsNetworkContext() NetworkContext {
return NetworkContext{EgressSelectionName: s}
}
func lookupServiceName(name string) (EgressType, error) { func lookupServiceName(name string) (EgressType, error) {
switch strings.ToLower(name) { switch strings.ToLower(name) {
case "master": case "master":

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package server package egressselector
import ( import (
"context" "context"

View File

@ -68,7 +68,7 @@ func (o *EgressSelectorOptions) ApplyTo(c *server.Config) error {
return fmt.Errorf("failed to validate egress selector configuration: %v", errs.ToAggregate()) return fmt.Errorf("failed to validate egress selector configuration: %v", errs.ToAggregate())
} }
cs, err := server.NewEgressSelector(npConfig) cs, err := egressselector.NewEgressSelector(npConfig)
if err != nil { if err != nil {
return fmt.Errorf("failed to setup egress selector with config %#v: %v", npConfig, err) return fmt.Errorf("failed to setup egress selector with config %#v: %v", npConfig, err)
} }

View File

@ -12,6 +12,7 @@ go_library(
importpath = "k8s.io/apiserver/pkg/storage/storagebackend", importpath = "k8s.io/apiserver/pkg/storage/storagebackend",
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
], ],
) )

View File

@ -20,6 +20,7 @@ import (
"time" "time"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/storage/value" "k8s.io/apiserver/pkg/storage/value"
) )
@ -38,6 +39,8 @@ type TransportConfig struct {
KeyFile string KeyFile string
CertFile string CertFile string
CAFile string CAFile string
// function to determine the egress dialer. (i.e. konnectivity server dialer)
EgressLookup egressselector.Lookup
} }
// Config is configuration for creating a storage backend. // Config is configuration for creating a storage backend.

View File

@ -34,7 +34,9 @@ go_library(
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory", importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory",
importpath = "k8s.io/apiserver/pkg/storage/storagebackend/factory", importpath = "k8s.io/apiserver/pkg/storage/storagebackend/factory",
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",

View File

@ -19,6 +19,8 @@ package factory
import ( import (
"context" "context"
"fmt" "fmt"
"net"
"net/url"
"path" "path"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -29,7 +31,9 @@ import (
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus" grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc" "google.golang.org/grpc"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
@ -106,17 +110,36 @@ func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error)
if len(c.CertFile) == 0 && len(c.KeyFile) == 0 && len(c.CAFile) == 0 { if len(c.CertFile) == 0 && len(c.KeyFile) == 0 && len(c.CAFile) == 0 {
tlsConfig = nil tlsConfig = nil
} }
networkContext := egressselector.Etcd.AsNetworkContext()
var egressDialer utilnet.DialFunc
if c.EgressLookup != nil {
egressDialer, err = c.EgressLookup(networkContext)
if err != nil {
return nil, err
}
}
dialOptions := []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
}
if egressDialer != nil {
dialer := func(ctx context.Context, addr string) (net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
return egressDialer(ctx, "tcp", u.Host)
}
dialOptions = append(dialOptions, grpc.WithContextDialer(dialer))
}
cfg := clientv3.Config{ cfg := clientv3.Config{
DialTimeout: dialTimeout, DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime, DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout, DialKeepAliveTimeout: keepaliveTimeout,
DialOptions: []grpc.DialOption{ DialOptions: dialOptions,
grpc.WithBlock(), // block until the underlying connection is up Endpoints: c.ServerList,
grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor), TLS: tlsConfig,
grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
},
Endpoints: c.ServerList,
TLS: tlsConfig,
} }
return clientv3.New(cfg) return clientv3.New(cfg)