Merge pull request #88549 from caesarxuchao/egressSelector-metrics

Add metrics for egress dials
This commit is contained in:
Kubernetes Prow Robot 2020-03-06 11:05:24 -08:00 committed by GitHub
commit 1836f95260
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 441 additions and 87 deletions

View File

@ -16,9 +16,11 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver/install:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver/v1beta1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/path:go_default_library",
"//vendor/k8s.io/utils/trace:go_default_library",
"//vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
],
@ -33,7 +35,10 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
@ -47,7 +52,11 @@ go_test(
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
],
)

View File

@ -22,16 +22,21 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"io/ioutil"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/apis/apiserver"
"k8s.io/klog"
"net"
"net/http"
"net/url"
client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
"strings"
"time"
"google.golang.org/grpc"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/apis/apiserver"
egressmetrics "k8s.io/apiserver/pkg/server/egressselector/metrics"
"k8s.io/klog"
utiltrace "k8s.io/utils/trace"
client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
)
var directDialer utilnet.DialFunc = http.DefaultTransport.(*http.Transport).DialContext
@ -123,16 +128,122 @@ func tunnelHTTPConnect(proxyConn net.Conn, proxyAddress, addr string) (net.Conn,
return proxyConn, nil
}
func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialFunc, error) {
clientCert := tcpTransport.TLSConfig.ClientCert
clientKey := tcpTransport.TLSConfig.ClientKey
caCert := tcpTransport.TLSConfig.CABundle
proxyURL, err := url.Parse(tcpTransport.URL)
if err != nil {
return nil, fmt.Errorf("invalid proxy server url %q: %v", tcpTransport.URL, err)
}
proxyAddress := proxyURL.Host
type proxier interface {
// proxy returns a connection to addr.
proxy(addr string) (net.Conn, error)
}
var _ proxier = &httpConnectProxier{}
type httpConnectProxier struct {
conn net.Conn
proxyAddress string
}
func (t *httpConnectProxier) proxy(addr string) (net.Conn, error) {
return tunnelHTTPConnect(t.conn, t.proxyAddress, addr)
}
var _ proxier = &grpcProxier{}
type grpcProxier struct {
tunnel client.Tunnel
}
func (g *grpcProxier) proxy(addr string) (net.Conn, error) {
return g.tunnel.Dial("tcp", addr)
}
type proxyServerConnector interface {
// connect establishes connection to the proxy server, and returns a
// proxier based on the connection.
connect() (proxier, error)
}
type tcpHTTPConnectConnector struct {
proxyAddress string
tlsConfig *tls.Config
}
func (t *tcpHTTPConnectConnector) connect() (proxier, error) {
conn, err := tls.Dial("tcp", t.proxyAddress, t.tlsConfig)
if err != nil {
return nil, err
}
return &httpConnectProxier{conn: conn, proxyAddress: t.proxyAddress}, nil
}
type udsHTTPConnectConnector struct {
udsName string
}
func (u *udsHTTPConnectConnector) connect() (proxier, error) {
conn, err := net.Dial("unix", u.udsName)
if err != nil {
return nil, err
}
return &httpConnectProxier{conn: conn, proxyAddress: u.udsName}, nil
}
type udsGRPCConnector struct {
udsName string
}
func (u *udsGRPCConnector) connect() (proxier, error) {
udsName := u.udsName
dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
c, err := net.Dial("unix", udsName)
if err != nil {
klog.Errorf("failed to create connection to uds name %s, error: %v", udsName, err)
}
return c, err
})
tunnel, err := client.CreateGrpcTunnel(udsName, dialOption, grpc.WithInsecure())
if err != nil {
return nil, err
}
return &grpcProxier{tunnel: tunnel}, nil
}
type dialerCreator struct {
connector proxyServerConnector
direct bool
options metricsOptions
}
type metricsOptions struct {
transport string
protocol string
}
func (d *dialerCreator) createDialer() utilnet.DialFunc {
if d.direct {
return directDialer
}
return func(ctx context.Context, network, addr string) (net.Conn, error) {
trace := utiltrace.New(fmt.Sprintf("Proxy via HTTP Connect over %s", d.options.transport), utiltrace.Field{Key: "address", Value: addr})
defer trace.LogIfLong(500 * time.Millisecond)
start := egressmetrics.Metrics.Clock().Now()
proxier, err := d.connector.connect()
if err != nil {
egressmetrics.Metrics.ObserveDialFailure(d.options.protocol, d.options.transport, egressmetrics.StageConnect)
return nil, err
}
conn, err := proxier.proxy(addr)
if err != nil {
egressmetrics.Metrics.ObserveDialFailure(d.options.protocol, d.options.transport, egressmetrics.StageProxy)
return nil, err
}
egressmetrics.Metrics.ObserveDialLatency(egressmetrics.Metrics.Clock().Now().Sub(start), d.options.protocol, d.options.transport)
return conn, nil
}
}
func getTLSConfig(t *apiserver.TLSConfig) (*tls.Config, error) {
clientCert := t.ClientCert
clientKey := t.ClientKey
caCert := t.CABundle
clientCerts, err := tls.LoadX509KeyPair(clientCert, clientKey)
if err != nil {
return nil, fmt.Errorf("failed to read key pair %s & %s, got %v", clientCert, clientKey, err)
@ -151,56 +262,75 @@ func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialF
// Use host's root CA set instead of providing our own
certPool = nil
}
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
klog.V(4).Infof("Sending request to %q.", addr)
proxyConn, err := tls.Dial("tcp", proxyAddress,
&tls.Config{
Certificates: []tls.Certificate{clientCerts},
RootCAs: certPool,
},
)
if err != nil {
return nil, fmt.Errorf("dialing proxy %q failed: %v", proxyAddress, err)
}
return tunnelHTTPConnect(proxyConn, proxyAddress, addr)
}
return contextDialer, nil
return &tls.Config{
Certificates: []tls.Certificate{clientCerts},
RootCAs: certPool,
}, nil
}
func createConnectUDSDialer(udsConfig *apiserver.UDSTransport) (utilnet.DialFunc, error) {
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
proxyConn, err := net.Dial("unix", udsConfig.UDSName)
if err != nil {
return nil, fmt.Errorf("dialing proxy %q failed: %v", udsConfig.UDSName, err)
}
return tunnelHTTPConnect(proxyConn, udsConfig.UDSName, addr)
func getProxyAddress(urlString string) (string, error) {
proxyURL, err := url.Parse(urlString)
if err != nil {
return "", fmt.Errorf("invalid proxy server url %q: %v", urlString, err)
}
return contextDialer, nil
return proxyURL.Host, nil
}
func createGRPCUDSDialer(udsName string) (utilnet.DialFunc, error) {
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
func connectionToDialerCreator(c apiserver.Connection) (*dialerCreator, error) {
switch c.ProxyProtocol {
dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
c, err := net.Dial("unix", udsName)
case apiserver.ProtocolHTTPConnect:
if c.Transport.UDS != nil {
return &dialerCreator{
connector: &udsHTTPConnectConnector{
udsName: c.Transport.UDS.UDSName,
},
options: metricsOptions{
transport: egressmetrics.TransportUDS,
protocol: egressmetrics.ProtocolHTTPConnect,
},
}, nil
} else if c.Transport.TCP != nil {
tlsConfig, err := getTLSConfig(c.Transport.TCP.TLSConfig)
if err != nil {
klog.Errorf("failed to create connection to uds name %s, error: %v", udsName, err)
return nil, err
}
return c, err
})
tunnel, err := client.CreateGrpcTunnel(udsName, dialOption, grpc.WithInsecure())
if err != nil {
return nil, err
proxyAddress, err := getProxyAddress(c.Transport.TCP.URL)
if err != nil {
return nil, err
}
return &dialerCreator{
connector: &tcpHTTPConnectConnector{
tlsConfig: tlsConfig,
proxyAddress: proxyAddress,
},
options: metricsOptions{
transport: egressmetrics.TransportTCP,
protocol: egressmetrics.ProtocolHTTPConnect,
},
}, nil
} else {
return nil, fmt.Errorf("Either a TCP or UDS transport must be specified")
}
proxyConn, err := tunnel.Dial("tcp", addr)
if err != nil {
return nil, err
case apiserver.ProtocolGRPC:
if c.Transport.UDS != nil {
return &dialerCreator{
connector: &udsGRPCConnector{
udsName: c.Transport.UDS.UDSName,
},
options: metricsOptions{
transport: egressmetrics.TransportUDS,
protocol: egressmetrics.ProtocolGRPC,
},
}, nil
}
return proxyConn, nil
return nil, fmt.Errorf("UDS transport must be specified for GRPC")
case apiserver.ProtocolDirect:
return &dialerCreator{direct: true}, nil
default:
return nil, fmt.Errorf("unrecognized service connection protocol %q", c.ProxyProtocol)
}
return contextDialer, nil
}
// NewEgressSelector configures lookup mechanism for Lookup.
@ -218,40 +348,11 @@ func NewEgressSelector(config *apiserver.EgressSelectorConfiguration) (*EgressSe
if err != nil {
return nil, err
}
switch service.Connection.ProxyProtocol {
case apiserver.ProtocolHTTPConnect:
if service.Connection.Transport.UDS != nil {
contextDialer, err := createConnectUDSDialer(service.Connection.Transport.UDS)
if err != nil {
return nil, fmt.Errorf("failed to create HTTPConnect uds dialer: %v", err)
}
cs.egressToDialer[name] = contextDialer
} else if service.Connection.Transport.TCP != nil {
contextDialer, err := createConnectTCPDialer(service.Connection.Transport.TCP)
if err != nil {
return nil, fmt.Errorf("failed to create HTTPConnect dialer: %v", err)
}
cs.egressToDialer[name] = contextDialer
} else {
return nil, fmt.Errorf("Either a TCP or UDS transport must be specified")
}
case apiserver.ProtocolGRPC:
if service.Connection.Transport.UDS != nil {
grpcContextDialer, err := createGRPCUDSDialer(service.Connection.Transport.UDS.UDSName)
if err != nil {
return nil, fmt.Errorf("failed to create grpc dialer: %v", err)
}
cs.egressToDialer[name] = grpcContextDialer
} else {
return nil, fmt.Errorf("UDS transport must be specified for GRPC")
}
case apiserver.ProtocolDirect:
cs.egressToDialer[name] = directDialer
default:
return nil, fmt.Errorf("unrecognized service connection protocol %q", service.Connection.ProxyProtocol)
dialerCreator, err := connectionToDialerCreator(service.Connection)
if err != nil {
return nil, fmt.Errorf("failed to create dialer for egressSelection %q: %v", name, err)
}
cs.egressToDialer[name] = dialerCreator.createDialer()
}
return cs, nil
}

View File

@ -18,12 +18,19 @@ package egressselector
import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/apis/apiserver"
"k8s.io/apiserver/pkg/server/egressselector/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
)
type fakeEgressSelection struct {
@ -163,3 +170,97 @@ func validateDirectDialer(dialer utilnet.DialFunc, s *fakeEgressSelection) (bool
}
return s.directDialerCalled, nil
}
type fakeProxyServerConnector struct {
connectorErr bool
proxierErr bool
}
func (f *fakeProxyServerConnector) connect() (proxier, error) {
if f.connectorErr {
return nil, fmt.Errorf("fake error")
}
return &fakeProxier{err: f.proxierErr}, nil
}
type fakeProxier struct {
err bool
}
func (f *fakeProxier) proxy(_ string) (net.Conn, error) {
if f.err {
return nil, fmt.Errorf("fake error")
}
return nil, nil
}
func TestMetrics(t *testing.T) {
testcases := map[string]struct {
connectorErr bool
proxierErr bool
metrics []string
want string
}{
"connect to proxy server error": {
connectorErr: true,
proxierErr: false,
metrics: []string{"apiserver_egress_dialer_dial_failure_count"},
want: `
# HELP apiserver_egress_dialer_dial_failure_count [ALPHA] Dial failure count, labeled by the protocol (http-connect or grpc), transport (tcp or uds), and stage (connect or proxy). The stage indicates at which stage the dial failed
# TYPE apiserver_egress_dialer_dial_failure_count counter
apiserver_egress_dialer_dial_failure_count{protocol="fake_protocol",stage="connect",transport="fake_transport"} 1
`,
},
"connect succeeded, proxy failed": {
connectorErr: false,
proxierErr: true,
metrics: []string{"apiserver_egress_dialer_dial_failure_count"},
want: `
# HELP apiserver_egress_dialer_dial_failure_count [ALPHA] Dial failure count, labeled by the protocol (http-connect or grpc), transport (tcp or uds), and stage (connect or proxy). The stage indicates at which stage the dial failed
# TYPE apiserver_egress_dialer_dial_failure_count counter
apiserver_egress_dialer_dial_failure_count{protocol="fake_protocol",stage="proxy",transport="fake_transport"} 1
`,
},
"successful": {
connectorErr: false,
proxierErr: false,
metrics: []string{"apiserver_egress_dialer_dial_duration_seconds"},
want: `
# HELP apiserver_egress_dialer_dial_duration_seconds [ALPHA] Dial latency histogram in seconds, labeled by the protocol (http-connect or grpc), transport (tcp or uds)
# TYPE apiserver_egress_dialer_dial_duration_seconds histogram
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="0.005"} 1
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="0.025"} 1
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="0.1"} 1
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="0.5"} 1
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="2.5"} 1
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="12.5"} 1
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="+Inf"} 1
apiserver_egress_dialer_dial_duration_seconds_sum{protocol="fake_protocol",transport="fake_transport"} 0
apiserver_egress_dialer_dial_duration_seconds_count{protocol="fake_protocol",transport="fake_transport"} 1
`,
},
}
for tn, tc := range testcases {
t.Run(tn, func(t *testing.T) {
metrics.Metrics.Reset()
metrics.Metrics.SetClock(clock.NewFakeClock(time.Now()))
d := dialerCreator{
connector: &fakeProxyServerConnector{
connectorErr: tc.connectorErr,
proxierErr: tc.proxierErr,
},
options: metricsOptions{
transport: "fake_transport",
protocol: "fake_protocol",
},
}
dialer := d.createDialer()
dialer(context.TODO(), "", "")
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tc.want), tc.metrics...); err != nil {
t.Errorf("Err in comparing metrics %v", err)
}
})
}
}

View File

@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["metrics.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/egressselector/metrics",
importpath = "k8s.io/apiserver/pkg/server/egressselector/metrics",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,114 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
namespace = "apiserver"
subsystem = "egress_dialer"
// ProtocolHTTPConnect means that the proxy protocol is http-connect.
ProtocolHTTPConnect = "http_connect"
// ProtocolGRPC means that the proxy protocol is the GRPC protocol.
ProtocolGRPC = "grpc"
// TransportTCP means that the transport is TCP.
TransportTCP = "tcp"
// TransportUDS means that the transport is UDS.
TransportUDS = "uds"
// StageConnect indicates that the dial failed at establishing connection to the proxy server.
StageConnect = "connect"
// StageProxy indicates that the dial failed at requesting the proxy server to proxy.
StageProxy = "proxy"
)
var (
// Use buckets ranging from 5 ms to 12.5 seconds.
latencyBuckets = []float64{0.005, 0.025, 0.1, 0.5, 2.5, 12.5}
// Metrics provides access to all dial metrics.
Metrics = newDialMetrics()
)
// DialMetrics instruments dials to proxy server with prometheus metrics.
type DialMetrics struct {
clock clock.Clock
latencies *metrics.HistogramVec
failures *metrics.CounterVec
}
// newDialMetrics create a new DialMetrics, configured with default metric names.
func newDialMetrics() *DialMetrics {
latencies := metrics.NewHistogramVec(
&metrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dial_duration_seconds",
Help: "Dial latency histogram in seconds, labeled by the protocol (http-connect or grpc), transport (tcp or uds)",
Buckets: latencyBuckets,
StabilityLevel: metrics.ALPHA,
},
[]string{"protocol", "transport"},
)
failures := metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dial_failure_count",
Help: "Dial failure count, labeled by the protocol (http-connect or grpc), transport (tcp or uds), and stage (connect or proxy). The stage indicates at which stage the dial failed",
StabilityLevel: metrics.ALPHA,
},
[]string{"protocol", "transport", "stage"},
)
legacyregistry.MustRegister(latencies)
legacyregistry.MustRegister(failures)
return &DialMetrics{latencies: latencies, failures: failures, clock: clock.RealClock{}}
}
// Clock returns the clock.
func (m *DialMetrics) Clock() clock.Clock {
return m.clock
}
// SetClock sets the clock.
func (m *DialMetrics) SetClock(c clock.Clock) {
m.clock = c
}
// Reset resets the metrics.
func (m *DialMetrics) Reset() {
m.latencies.Reset()
m.failures.Reset()
}
// ObserveDialLatency records the latency of a dial, labeled by protocol, transport.
func (m *DialMetrics) ObserveDialLatency(elapsed time.Duration, protocol, transport string) {
m.latencies.WithLabelValues(protocol, transport).Observe(elapsed.Seconds())
}
// ObserveDialFailure records a failed dial, labeled by protocol, transport, and the stage the dial failed at.
func (m *DialMetrics) ObserveDialFailure(protocol, transport, stage string) {
m.failures.WithLabelValues(protocol, transport, stage).Inc()
}

1
vendor/modules.txt vendored
View File

@ -1348,6 +1348,7 @@ k8s.io/apiserver/pkg/registry/rest/resttest
k8s.io/apiserver/pkg/server
k8s.io/apiserver/pkg/server/dynamiccertificates
k8s.io/apiserver/pkg/server/egressselector
k8s.io/apiserver/pkg/server/egressselector/metrics
k8s.io/apiserver/pkg/server/filters
k8s.io/apiserver/pkg/server/healthz
k8s.io/apiserver/pkg/server/httplog