From fbb1fb8902c06cbcce47a025ce22fe260b27a697 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Tue, 25 Feb 2020 14:23:24 -0800 Subject: [PATCH 1/3] add metrics and traces for egress dials --- .../server/egressselector/egress_selector.go | 45 ++++++-- .../server/egressselector/metrics/metrics.go | 103 ++++++++++++++++++ 2 files changed, 140 insertions(+), 8 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics/metrics.go diff --git a/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go b/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go index d333debefbe..3e591b70e39 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go +++ b/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go @@ -22,16 +22,20 @@ import ( "crypto/tls" "crypto/x509" "fmt" - "google.golang.org/grpc" "io/ioutil" - utilnet "k8s.io/apimachinery/pkg/util/net" - "k8s.io/apiserver/pkg/apis/apiserver" - "k8s.io/klog" "net" "net/http" "net/url" - client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" "strings" + "time" + + "google.golang.org/grpc" + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apiserver/pkg/apis/apiserver" + egressmetrics "k8s.io/apiserver/pkg/server/egressselector/metrics" + "k8s.io/klog" + utiltrace "k8s.io/utils/trace" + client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" ) var directDialer utilnet.DialFunc = http.DefaultTransport.(*http.Transport).DialContext @@ -152,7 +156,10 @@ func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialF certPool = nil } contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) { + trace := utiltrace.New("Proxy via HTTP Connect over TCP", utiltrace.Field{Key: "address", Value: addr}) + defer trace.LogIfLong(500 * time.Millisecond) klog.V(4).Infof("Sending request to %q.", addr) + start := time.Now() proxyConn, err := tls.Dial("tcp", proxyAddress, &tls.Config{ Certificates: []tls.Certificate{clientCerts}, @@ -160,27 +167,46 @@ func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialF }, ) if err != nil { + egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP, egressmetrics.StageDial) return nil, fmt.Errorf("dialing proxy %q failed: %v", proxyAddress, err) } - return tunnelHTTPConnect(proxyConn, proxyAddress, addr) + ret, err := tunnelHTTPConnect(proxyConn, proxyAddress, addr) + if err != nil { + egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP, egressmetrics.StageProxy) + return nil, err + } + egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP) + return ret, nil } return contextDialer, nil } func createConnectUDSDialer(udsConfig *apiserver.UDSTransport) (utilnet.DialFunc, error) { contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) { + trace := utiltrace.New("Proxy via HTTP Connect over UDS", utiltrace.Field{Key: "address", Value: addr}) + defer trace.LogIfLong(500 * time.Millisecond) + start := time.Now() proxyConn, err := net.Dial("unix", udsConfig.UDSName) if err != nil { + egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS, egressmetrics.StageDial) return nil, fmt.Errorf("dialing proxy %q failed: %v", udsConfig.UDSName, err) } - return tunnelHTTPConnect(proxyConn, udsConfig.UDSName, addr) + ret, err := tunnelHTTPConnect(proxyConn, udsConfig.UDSName, addr) + if err != nil { + egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS, egressmetrics.StageProxy) + return nil, err + } + egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS) + return ret, nil } return contextDialer, nil } func createGRPCUDSDialer(udsName string) (utilnet.DialFunc, error) { contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) { - + trace := utiltrace.New("Proxy via GRPC over UDS", utiltrace.Field{Key: "address", Value: addr}) + defer trace.LogIfLong(500 * time.Millisecond) + start := time.Now() dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { c, err := net.Dial("unix", udsName) if err != nil { @@ -191,13 +217,16 @@ func createGRPCUDSDialer(udsName string) (utilnet.DialFunc, error) { tunnel, err := client.CreateGrpcTunnel(udsName, dialOption, grpc.WithInsecure()) if err != nil { + egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS, egressmetrics.StageDial) return nil, err } proxyConn, err := tunnel.Dial("tcp", addr) if err != nil { + egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS, egressmetrics.StageProxy) return nil, err } + egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS) return proxyConn, nil } return contextDialer, nil diff --git a/staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics/metrics.go new file mode 100644 index 00000000000..67422de25b7 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics/metrics.go @@ -0,0 +1,103 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "time" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const ( + namespace = "apiserver" + subsystem = "egress_dialer" + + // ProtocolHTTPConnect means that the proxy protocol is http-connect. + ProtocolHTTPConnect = "http_connect" + // ProtocolHTTPGRPC means that the proxy protocol is the GRPC protocol. + ProtocolGRPC = "grpc" + // TransportTCP means that the transport is TCP. + TransportTCP = "tcp" + // TransportUDS means that the transport is UDS. + TransportUDS = "uds" + // StageTransport indicates that the dial failed at dialing to the proxy server. + StageDial = "dial" + // StageProtocol indicates that the dial failed at requesting the proxy server to proxy. + StageProxy = "proxy" +) + +var ( + // Use buckets ranging from 5 ms to 12.5 seconds. + latencyBuckets = []float64{0.005, 0.025, 0.1, 0.5, 2.5, 12.5} + latencySummaryMaxAge = 5 * time.Hour + + // Metrics provides access to all dial metrics. + Metrics = newDialMetrics() +) + +// DialMetrics instruments dials to proxy server with prometheus metrics. +type DialMetrics struct { + latencies *metrics.HistogramVec + failures *metrics.CounterVec +} + +// newDialMetrics create a new DialMetrics, configured with default metric names. +func newDialMetrics() *DialMetrics { + latencies := metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dial_duration_seconds", + Help: "Dial latency histogram in seconds, labeled by the protocol (http-connect or grpc), transport (tcp or uds)", + Buckets: latencyBuckets, + StabilityLevel: metrics.ALPHA, + }, + []string{"protocol", "transport"}, + ) + + failures := metrics.NewCounterVec( + &metrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dial_failure_count", + Help: "Dial failure count, labeled by the protocol (http-connect or grpc), transport (tcp or uds), and stage (dial or proxy). The stage indicates at which stage the dial failed", + StabilityLevel: metrics.ALPHA, + }, + []string{"protocol", "transport", "stage"}, + ) + + legacyregistry.MustRegister(latencies) + legacyregistry.MustRegister(failures) + return &DialMetrics{latencies: latencies, failures: failures} +} + +// Reset resets the metrics. +func (m *DialMetrics) Reset() { + m.latencies.Reset() + m.failures.Reset() +} + +// ObserveDialLatency records the latency of a dial, labeled by protocol, transport. +func (m *DialMetrics) ObserveDialLatency(elapsed time.Duration, protocol, transport string) { + m.latencies.WithLabelValues(protocol, transport).Observe(elapsed.Seconds()) +} + +// ObserverDialFailure records a failed dial, labeled by protocol, transport, and the stage the dial failed at. +func (m *DialMetrics) ObserveDialFailure(protocol, transport, stage string) { + m.failures.WithLabelValues(protocol, transport, stage).Inc() +} From bac9351c64671ce4d5198d431c97bf1ccd72752f Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Wed, 26 Feb 2020 16:00:43 -0800 Subject: [PATCH 2/3] refactor egress dialer construction code and add unit test --- .../server/egressselector/egress_selector.go | 286 +++++++++++------- .../egressselector/egress_selector_test.go | 101 +++++++ .../server/egressselector/metrics/metrics.go | 29 +- 3 files changed, 300 insertions(+), 116 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go b/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go index 3e591b70e39..b8bdec2804c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go +++ b/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go @@ -30,6 +30,7 @@ import ( "time" "google.golang.org/grpc" + utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apiserver/pkg/apis/apiserver" egressmetrics "k8s.io/apiserver/pkg/server/egressselector/metrics" @@ -127,16 +128,122 @@ func tunnelHTTPConnect(proxyConn net.Conn, proxyAddress, addr string) (net.Conn, return proxyConn, nil } -func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialFunc, error) { - clientCert := tcpTransport.TLSConfig.ClientCert - clientKey := tcpTransport.TLSConfig.ClientKey - caCert := tcpTransport.TLSConfig.CABundle - proxyURL, err := url.Parse(tcpTransport.URL) - if err != nil { - return nil, fmt.Errorf("invalid proxy server url %q: %v", tcpTransport.URL, err) - } - proxyAddress := proxyURL.Host +type proxier interface { + // proxy returns a connection to addr. + proxy(addr string) (net.Conn, error) +} +var _ proxier = &httpConnectProxier{} + +type httpConnectProxier struct { + conn net.Conn + proxyAddress string +} + +func (t *httpConnectProxier) proxy(addr string) (net.Conn, error) { + return tunnelHTTPConnect(t.conn, t.proxyAddress, addr) +} + +var _ proxier = &grpcProxier{} + +type grpcProxier struct { + tunnel client.Tunnel +} + +func (g *grpcProxier) proxy(addr string) (net.Conn, error) { + return g.tunnel.Dial("tcp", addr) +} + +type proxyServerConnector interface { + // connect establishes connection to the proxy server, and returns a + // proxier based on the connection. + connect() (proxier, error) +} + +type tcpHTTPConnectConnector struct { + proxyAddress string + tlsConfig *tls.Config +} + +func (t *tcpHTTPConnectConnector) connect() (proxier, error) { + conn, err := tls.Dial("tcp", t.proxyAddress, t.tlsConfig) + if err != nil { + return nil, err + } + return &httpConnectProxier{conn: conn, proxyAddress: t.proxyAddress}, nil +} + +type udsHTTPConnectConnector struct { + udsName string +} + +func (u *udsHTTPConnectConnector) connect() (proxier, error) { + conn, err := net.Dial("unix", u.udsName) + if err != nil { + return nil, err + } + return &httpConnectProxier{conn: conn, proxyAddress: u.udsName}, nil +} + +type udsGRPCConnector struct { + udsName string +} + +func (u *udsGRPCConnector) connect() (proxier, error) { + udsName := u.udsName + dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + c, err := net.Dial("unix", udsName) + if err != nil { + klog.Errorf("failed to create connection to uds name %s, error: %v", udsName, err) + } + return c, err + }) + + tunnel, err := client.CreateGrpcTunnel(udsName, dialOption, grpc.WithInsecure()) + if err != nil { + return nil, err + } + return &grpcProxier{tunnel: tunnel}, nil +} + +type dialerCreator struct { + connector proxyServerConnector + direct bool + options metricsOptions +} + +type metricsOptions struct { + transport string + protocol string +} + +func (d *dialerCreator) createDialer() utilnet.DialFunc { + if d.direct { + return directDialer + } + return func(ctx context.Context, network, addr string) (net.Conn, error) { + trace := utiltrace.New(fmt.Sprintf("Proxy via HTTP Connect over %s", d.options.transport), utiltrace.Field{Key: "address", Value: addr}) + defer trace.LogIfLong(500 * time.Millisecond) + start := egressmetrics.Metrics.Clock().Now() + proxier, err := d.connector.connect() + if err != nil { + egressmetrics.Metrics.ObserveDialFailure(d.options.protocol, d.options.transport, egressmetrics.StageConnect) + return nil, err + } + conn, err := proxier.proxy(addr) + if err != nil { + egressmetrics.Metrics.ObserveDialFailure(d.options.protocol, d.options.transport, egressmetrics.StageProxy) + return nil, err + } + egressmetrics.Metrics.ObserveDialLatency(egressmetrics.Metrics.Clock().Now().Sub(start), d.options.protocol, d.options.transport) + return conn, nil + } +} + +func getTLSConfig(t *apiserver.TLSConfig) (*tls.Config, error) { + clientCert := t.ClientCert + clientKey := t.ClientKey + caCert := t.CABundle clientCerts, err := tls.LoadX509KeyPair(clientCert, clientKey) if err != nil { return nil, fmt.Errorf("failed to read key pair %s & %s, got %v", clientCert, clientKey, err) @@ -155,81 +262,75 @@ func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialF // Use host's root CA set instead of providing our own certPool = nil } - contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) { - trace := utiltrace.New("Proxy via HTTP Connect over TCP", utiltrace.Field{Key: "address", Value: addr}) - defer trace.LogIfLong(500 * time.Millisecond) - klog.V(4).Infof("Sending request to %q.", addr) - start := time.Now() - proxyConn, err := tls.Dial("tcp", proxyAddress, - &tls.Config{ - Certificates: []tls.Certificate{clientCerts}, - RootCAs: certPool, - }, - ) - if err != nil { - egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP, egressmetrics.StageDial) - return nil, fmt.Errorf("dialing proxy %q failed: %v", proxyAddress, err) - } - ret, err := tunnelHTTPConnect(proxyConn, proxyAddress, addr) - if err != nil { - egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP, egressmetrics.StageProxy) - return nil, err - } - egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP) - return ret, nil - } - return contextDialer, nil + return &tls.Config{ + Certificates: []tls.Certificate{clientCerts}, + RootCAs: certPool, + }, nil } -func createConnectUDSDialer(udsConfig *apiserver.UDSTransport) (utilnet.DialFunc, error) { - contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) { - trace := utiltrace.New("Proxy via HTTP Connect over UDS", utiltrace.Field{Key: "address", Value: addr}) - defer trace.LogIfLong(500 * time.Millisecond) - start := time.Now() - proxyConn, err := net.Dial("unix", udsConfig.UDSName) - if err != nil { - egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS, egressmetrics.StageDial) - return nil, fmt.Errorf("dialing proxy %q failed: %v", udsConfig.UDSName, err) - } - ret, err := tunnelHTTPConnect(proxyConn, udsConfig.UDSName, addr) - if err != nil { - egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS, egressmetrics.StageProxy) - return nil, err - } - egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS) - return ret, nil +func getProxyAddress(urlString string) (string, error) { + proxyURL, err := url.Parse(urlString) + if err != nil { + return "", fmt.Errorf("invalid proxy server url %q: %v", urlString, err) } - return contextDialer, nil + return proxyURL.Host, nil } -func createGRPCUDSDialer(udsName string) (utilnet.DialFunc, error) { - contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) { - trace := utiltrace.New("Proxy via GRPC over UDS", utiltrace.Field{Key: "address", Value: addr}) - defer trace.LogIfLong(500 * time.Millisecond) - start := time.Now() - dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { - c, err := net.Dial("unix", udsName) +func connectionToDialerCreator(c apiserver.Connection) (*dialerCreator, error) { + switch c.ProxyProtocol { + + case apiserver.ProtocolHTTPConnect: + if c.Transport.UDS != nil { + return &dialerCreator{ + connector: &udsHTTPConnectConnector{ + udsName: c.Transport.UDS.UDSName, + }, + options: metricsOptions{ + transport: egressmetrics.TransportUDS, + protocol: egressmetrics.ProtocolHTTPConnect, + }, + }, nil + } else if c.Transport.TCP != nil { + tlsConfig, err := getTLSConfig(c.Transport.TCP.TLSConfig) if err != nil { - klog.Errorf("failed to create connection to uds name %s, error: %v", udsName, err) + return nil, err } - return c, err - }) - - tunnel, err := client.CreateGrpcTunnel(udsName, dialOption, grpc.WithInsecure()) - if err != nil { - egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS, egressmetrics.StageDial) - return nil, err + proxyAddress, err := getProxyAddress(c.Transport.TCP.URL) + if err != nil { + return nil, err + } + return &dialerCreator{ + connector: &tcpHTTPConnectConnector{ + tlsConfig: tlsConfig, + proxyAddress: proxyAddress, + }, + options: metricsOptions{ + transport: egressmetrics.TransportTCP, + protocol: egressmetrics.ProtocolHTTPConnect, + }, + }, nil + } else { + return nil, fmt.Errorf("Either a TCP or UDS transport must be specified") } - - proxyConn, err := tunnel.Dial("tcp", addr) - if err != nil { - egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS, egressmetrics.StageProxy) - return nil, err + case apiserver.ProtocolGRPC: + if c.Transport.UDS != nil { + return &dialerCreator{ + connector: &udsGRPCConnector{ + udsName: c.Transport.UDS.UDSName, + }, + options: metricsOptions{ + transport: egressmetrics.TransportUDS, + protocol: egressmetrics.ProtocolGRPC, + }, + }, nil } - egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS) - return proxyConn, nil + return nil, fmt.Errorf("UDS transport must be specified for GRPC") + case apiserver.ProtocolDirect: + return &dialerCreator{direct: true}, nil + default: + return nil, fmt.Errorf("unrecognized service connection protocol %q", c.ProxyProtocol) } - return contextDialer, nil + } // NewEgressSelector configures lookup mechanism for Lookup. @@ -247,40 +348,11 @@ func NewEgressSelector(config *apiserver.EgressSelectorConfiguration) (*EgressSe if err != nil { return nil, err } - switch service.Connection.ProxyProtocol { - - case apiserver.ProtocolHTTPConnect: - if service.Connection.Transport.UDS != nil { - contextDialer, err := createConnectUDSDialer(service.Connection.Transport.UDS) - if err != nil { - return nil, fmt.Errorf("failed to create HTTPConnect uds dialer: %v", err) - } - cs.egressToDialer[name] = contextDialer - } else if service.Connection.Transport.TCP != nil { - contextDialer, err := createConnectTCPDialer(service.Connection.Transport.TCP) - if err != nil { - return nil, fmt.Errorf("failed to create HTTPConnect dialer: %v", err) - } - cs.egressToDialer[name] = contextDialer - } else { - return nil, fmt.Errorf("Either a TCP or UDS transport must be specified") - } - case apiserver.ProtocolGRPC: - if service.Connection.Transport.UDS != nil { - grpcContextDialer, err := createGRPCUDSDialer(service.Connection.Transport.UDS.UDSName) - if err != nil { - return nil, fmt.Errorf("failed to create grpc dialer: %v", err) - } - cs.egressToDialer[name] = grpcContextDialer - - } else { - return nil, fmt.Errorf("UDS transport must be specified for GRPC") - } - case apiserver.ProtocolDirect: - cs.egressToDialer[name] = directDialer - default: - return nil, fmt.Errorf("unrecognized service connection protocol %q", service.Connection.ProxyProtocol) + dialerCreator, err := connectionToDialerCreator(service.Connection) + if err != nil { + return nil, fmt.Errorf("failed to create dialer for egressSelection %q: %v", name, err) } + cs.egressToDialer[name] = dialerCreator.createDialer() } return cs, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector_test.go b/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector_test.go index dcd51906f02..8ff72fe8a9e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/egressselector/egress_selector_test.go @@ -18,12 +18,19 @@ package egressselector import ( "context" + "fmt" "net" + "strings" "testing" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apiserver/pkg/apis/apiserver" + "k8s.io/apiserver/pkg/server/egressselector/metrics" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/component-base/metrics/testutil" ) type fakeEgressSelection struct { @@ -163,3 +170,97 @@ func validateDirectDialer(dialer utilnet.DialFunc, s *fakeEgressSelection) (bool } return s.directDialerCalled, nil } + +type fakeProxyServerConnector struct { + connectorErr bool + proxierErr bool +} + +func (f *fakeProxyServerConnector) connect() (proxier, error) { + if f.connectorErr { + return nil, fmt.Errorf("fake error") + } + return &fakeProxier{err: f.proxierErr}, nil +} + +type fakeProxier struct { + err bool +} + +func (f *fakeProxier) proxy(_ string) (net.Conn, error) { + if f.err { + return nil, fmt.Errorf("fake error") + } + return nil, nil +} + +func TestMetrics(t *testing.T) { + testcases := map[string]struct { + connectorErr bool + proxierErr bool + metrics []string + want string + }{ + "connect to proxy server error": { + connectorErr: true, + proxierErr: false, + metrics: []string{"apiserver_egress_dialer_dial_failure_count"}, + want: ` + # HELP apiserver_egress_dialer_dial_failure_count [ALPHA] Dial failure count, labeled by the protocol (http-connect or grpc), transport (tcp or uds), and stage (connect or proxy). The stage indicates at which stage the dial failed + # TYPE apiserver_egress_dialer_dial_failure_count counter + apiserver_egress_dialer_dial_failure_count{protocol="fake_protocol",stage="connect",transport="fake_transport"} 1 +`, + }, + "connect succeeded, proxy failed": { + connectorErr: false, + proxierErr: true, + metrics: []string{"apiserver_egress_dialer_dial_failure_count"}, + want: ` + # HELP apiserver_egress_dialer_dial_failure_count [ALPHA] Dial failure count, labeled by the protocol (http-connect or grpc), transport (tcp or uds), and stage (connect or proxy). The stage indicates at which stage the dial failed + # TYPE apiserver_egress_dialer_dial_failure_count counter + apiserver_egress_dialer_dial_failure_count{protocol="fake_protocol",stage="proxy",transport="fake_transport"} 1 +`, + }, + "successful": { + connectorErr: false, + proxierErr: false, + metrics: []string{"apiserver_egress_dialer_dial_duration_seconds"}, + want: ` + # HELP apiserver_egress_dialer_dial_duration_seconds [ALPHA] Dial latency histogram in seconds, labeled by the protocol (http-connect or grpc), transport (tcp or uds) + # TYPE apiserver_egress_dialer_dial_duration_seconds histogram + apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="0.005"} 1 + apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="0.025"} 1 + apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="0.1"} 1 + apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="0.5"} 1 + apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="2.5"} 1 + apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="12.5"} 1 + apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="+Inf"} 1 + apiserver_egress_dialer_dial_duration_seconds_sum{protocol="fake_protocol",transport="fake_transport"} 0 + apiserver_egress_dialer_dial_duration_seconds_count{protocol="fake_protocol",transport="fake_transport"} 1 +`, + }, + } + for tn, tc := range testcases { + + t.Run(tn, func(t *testing.T) { + metrics.Metrics.Reset() + metrics.Metrics.SetClock(clock.NewFakeClock(time.Now())) + d := dialerCreator{ + connector: &fakeProxyServerConnector{ + connectorErr: tc.connectorErr, + proxierErr: tc.proxierErr, + }, + options: metricsOptions{ + transport: "fake_transport", + protocol: "fake_protocol", + }, + } + dialer := d.createDialer() + dialer(context.TODO(), "", "") + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tc.want), tc.metrics...); err != nil { + t.Errorf("Err in comparing metrics %v", err) + } + }) + } + +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics/metrics.go index 67422de25b7..04ad61c4fd0 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics/metrics.go @@ -19,6 +19,7 @@ package metrics import ( "time" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" ) @@ -29,22 +30,21 @@ const ( // ProtocolHTTPConnect means that the proxy protocol is http-connect. ProtocolHTTPConnect = "http_connect" - // ProtocolHTTPGRPC means that the proxy protocol is the GRPC protocol. + // ProtocolGRPC means that the proxy protocol is the GRPC protocol. ProtocolGRPC = "grpc" // TransportTCP means that the transport is TCP. TransportTCP = "tcp" // TransportUDS means that the transport is UDS. TransportUDS = "uds" - // StageTransport indicates that the dial failed at dialing to the proxy server. - StageDial = "dial" - // StageProtocol indicates that the dial failed at requesting the proxy server to proxy. + // StageConnect indicates that the dial failed at establishing connection to the proxy server. + StageConnect = "connect" + // StageProxy indicates that the dial failed at requesting the proxy server to proxy. StageProxy = "proxy" ) var ( // Use buckets ranging from 5 ms to 12.5 seconds. - latencyBuckets = []float64{0.005, 0.025, 0.1, 0.5, 2.5, 12.5} - latencySummaryMaxAge = 5 * time.Hour + latencyBuckets = []float64{0.005, 0.025, 0.1, 0.5, 2.5, 12.5} // Metrics provides access to all dial metrics. Metrics = newDialMetrics() @@ -52,6 +52,7 @@ var ( // DialMetrics instruments dials to proxy server with prometheus metrics. type DialMetrics struct { + clock clock.Clock latencies *metrics.HistogramVec failures *metrics.CounterVec } @@ -75,7 +76,7 @@ func newDialMetrics() *DialMetrics { Namespace: namespace, Subsystem: subsystem, Name: "dial_failure_count", - Help: "Dial failure count, labeled by the protocol (http-connect or grpc), transport (tcp or uds), and stage (dial or proxy). The stage indicates at which stage the dial failed", + Help: "Dial failure count, labeled by the protocol (http-connect or grpc), transport (tcp or uds), and stage (connect or proxy). The stage indicates at which stage the dial failed", StabilityLevel: metrics.ALPHA, }, []string{"protocol", "transport", "stage"}, @@ -83,7 +84,17 @@ func newDialMetrics() *DialMetrics { legacyregistry.MustRegister(latencies) legacyregistry.MustRegister(failures) - return &DialMetrics{latencies: latencies, failures: failures} + return &DialMetrics{latencies: latencies, failures: failures, clock: clock.RealClock{}} +} + +// Clock returns the clock. +func (m *DialMetrics) Clock() clock.Clock { + return m.clock +} + +// SetClock sets the clock. +func (m *DialMetrics) SetClock(c clock.Clock) { + m.clock = c } // Reset resets the metrics. @@ -97,7 +108,7 @@ func (m *DialMetrics) ObserveDialLatency(elapsed time.Duration, protocol, transp m.latencies.WithLabelValues(protocol, transport).Observe(elapsed.Seconds()) } -// ObserverDialFailure records a failed dial, labeled by protocol, transport, and the stage the dial failed at. +// ObserveDialFailure records a failed dial, labeled by protocol, transport, and the stage the dial failed at. func (m *DialMetrics) ObserveDialFailure(protocol, transport, stage string) { m.failures.WithLabelValues(protocol, transport, stage).Inc() } From 1e78fc0bf03c344165f792331999b590c602708d Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Wed, 26 Feb 2020 16:01:24 -0800 Subject: [PATCH 3/3] generaetd --- .../apiserver/pkg/server/egressselector/BUILD | 11 +++++++- .../pkg/server/egressselector/metrics/BUILD | 28 +++++++++++++++++++ vendor/modules.txt | 1 + 3 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics/BUILD diff --git a/staging/src/k8s.io/apiserver/pkg/server/egressselector/BUILD b/staging/src/k8s.io/apiserver/pkg/server/egressselector/BUILD index abff3b68fe6..a83cb2c8ec3 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/egressselector/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/egressselector/BUILD @@ -16,9 +16,11 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver/install:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver/v1beta1:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/path:go_default_library", + "//vendor/k8s.io/utils/trace:go_default_library", "//vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", ], @@ -33,7 +35,10 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics:all-srcs", + ], tags = ["automanaged"], visibility = ["//visibility:public"], ) @@ -47,7 +52,11 @@ go_test( embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/apiserver:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics:go_default_library", + "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", + "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics/BUILD b/staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics/BUILD new file mode 100644 index 00000000000..99f1a0343ea --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/egressselector/metrics/BUILD @@ -0,0 +1,28 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["metrics.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/egressselector/metrics", + importpath = "k8s.io/apiserver/pkg/server/egressselector/metrics", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//staging/src/k8s.io/component-base/metrics:go_default_library", + "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/vendor/modules.txt b/vendor/modules.txt index 75809ef888e..7630d2543da 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1348,6 +1348,7 @@ k8s.io/apiserver/pkg/registry/rest/resttest k8s.io/apiserver/pkg/server k8s.io/apiserver/pkg/server/dynamiccertificates k8s.io/apiserver/pkg/server/egressselector +k8s.io/apiserver/pkg/server/egressselector/metrics k8s.io/apiserver/pkg/server/filters k8s.io/apiserver/pkg/server/healthz k8s.io/apiserver/pkg/server/httplog