mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
refactor egress dialer construction code and add unit test
This commit is contained in:
parent
fbb1fb8902
commit
bac9351c64
@ -30,6 +30,7 @@ import (
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apiserver/pkg/apis/apiserver"
|
||||
egressmetrics "k8s.io/apiserver/pkg/server/egressselector/metrics"
|
||||
@ -127,16 +128,122 @@ func tunnelHTTPConnect(proxyConn net.Conn, proxyAddress, addr string) (net.Conn,
|
||||
return proxyConn, nil
|
||||
}
|
||||
|
||||
func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialFunc, error) {
|
||||
clientCert := tcpTransport.TLSConfig.ClientCert
|
||||
clientKey := tcpTransport.TLSConfig.ClientKey
|
||||
caCert := tcpTransport.TLSConfig.CABundle
|
||||
proxyURL, err := url.Parse(tcpTransport.URL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid proxy server url %q: %v", tcpTransport.URL, err)
|
||||
}
|
||||
proxyAddress := proxyURL.Host
|
||||
type proxier interface {
|
||||
// proxy returns a connection to addr.
|
||||
proxy(addr string) (net.Conn, error)
|
||||
}
|
||||
|
||||
var _ proxier = &httpConnectProxier{}
|
||||
|
||||
type httpConnectProxier struct {
|
||||
conn net.Conn
|
||||
proxyAddress string
|
||||
}
|
||||
|
||||
func (t *httpConnectProxier) proxy(addr string) (net.Conn, error) {
|
||||
return tunnelHTTPConnect(t.conn, t.proxyAddress, addr)
|
||||
}
|
||||
|
||||
var _ proxier = &grpcProxier{}
|
||||
|
||||
type grpcProxier struct {
|
||||
tunnel client.Tunnel
|
||||
}
|
||||
|
||||
func (g *grpcProxier) proxy(addr string) (net.Conn, error) {
|
||||
return g.tunnel.Dial("tcp", addr)
|
||||
}
|
||||
|
||||
type proxyServerConnector interface {
|
||||
// connect establishes connection to the proxy server, and returns a
|
||||
// proxier based on the connection.
|
||||
connect() (proxier, error)
|
||||
}
|
||||
|
||||
type tcpHTTPConnectConnector struct {
|
||||
proxyAddress string
|
||||
tlsConfig *tls.Config
|
||||
}
|
||||
|
||||
func (t *tcpHTTPConnectConnector) connect() (proxier, error) {
|
||||
conn, err := tls.Dial("tcp", t.proxyAddress, t.tlsConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &httpConnectProxier{conn: conn, proxyAddress: t.proxyAddress}, nil
|
||||
}
|
||||
|
||||
type udsHTTPConnectConnector struct {
|
||||
udsName string
|
||||
}
|
||||
|
||||
func (u *udsHTTPConnectConnector) connect() (proxier, error) {
|
||||
conn, err := net.Dial("unix", u.udsName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &httpConnectProxier{conn: conn, proxyAddress: u.udsName}, nil
|
||||
}
|
||||
|
||||
type udsGRPCConnector struct {
|
||||
udsName string
|
||||
}
|
||||
|
||||
func (u *udsGRPCConnector) connect() (proxier, error) {
|
||||
udsName := u.udsName
|
||||
dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
|
||||
c, err := net.Dial("unix", udsName)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to create connection to uds name %s, error: %v", udsName, err)
|
||||
}
|
||||
return c, err
|
||||
})
|
||||
|
||||
tunnel, err := client.CreateGrpcTunnel(udsName, dialOption, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &grpcProxier{tunnel: tunnel}, nil
|
||||
}
|
||||
|
||||
type dialerCreator struct {
|
||||
connector proxyServerConnector
|
||||
direct bool
|
||||
options metricsOptions
|
||||
}
|
||||
|
||||
type metricsOptions struct {
|
||||
transport string
|
||||
protocol string
|
||||
}
|
||||
|
||||
func (d *dialerCreator) createDialer() utilnet.DialFunc {
|
||||
if d.direct {
|
||||
return directDialer
|
||||
}
|
||||
return func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
trace := utiltrace.New(fmt.Sprintf("Proxy via HTTP Connect over %s", d.options.transport), utiltrace.Field{Key: "address", Value: addr})
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
start := egressmetrics.Metrics.Clock().Now()
|
||||
proxier, err := d.connector.connect()
|
||||
if err != nil {
|
||||
egressmetrics.Metrics.ObserveDialFailure(d.options.protocol, d.options.transport, egressmetrics.StageConnect)
|
||||
return nil, err
|
||||
}
|
||||
conn, err := proxier.proxy(addr)
|
||||
if err != nil {
|
||||
egressmetrics.Metrics.ObserveDialFailure(d.options.protocol, d.options.transport, egressmetrics.StageProxy)
|
||||
return nil, err
|
||||
}
|
||||
egressmetrics.Metrics.ObserveDialLatency(egressmetrics.Metrics.Clock().Now().Sub(start), d.options.protocol, d.options.transport)
|
||||
return conn, nil
|
||||
}
|
||||
}
|
||||
|
||||
func getTLSConfig(t *apiserver.TLSConfig) (*tls.Config, error) {
|
||||
clientCert := t.ClientCert
|
||||
clientKey := t.ClientKey
|
||||
caCert := t.CABundle
|
||||
clientCerts, err := tls.LoadX509KeyPair(clientCert, clientKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read key pair %s & %s, got %v", clientCert, clientKey, err)
|
||||
@ -155,81 +262,75 @@ func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialF
|
||||
// Use host's root CA set instead of providing our own
|
||||
certPool = nil
|
||||
}
|
||||
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
trace := utiltrace.New("Proxy via HTTP Connect over TCP", utiltrace.Field{Key: "address", Value: addr})
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
klog.V(4).Infof("Sending request to %q.", addr)
|
||||
start := time.Now()
|
||||
proxyConn, err := tls.Dial("tcp", proxyAddress,
|
||||
&tls.Config{
|
||||
Certificates: []tls.Certificate{clientCerts},
|
||||
RootCAs: certPool,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP, egressmetrics.StageDial)
|
||||
return nil, fmt.Errorf("dialing proxy %q failed: %v", proxyAddress, err)
|
||||
}
|
||||
ret, err := tunnelHTTPConnect(proxyConn, proxyAddress, addr)
|
||||
if err != nil {
|
||||
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP, egressmetrics.StageProxy)
|
||||
return nil, err
|
||||
}
|
||||
egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP)
|
||||
return ret, nil
|
||||
}
|
||||
return contextDialer, nil
|
||||
return &tls.Config{
|
||||
Certificates: []tls.Certificate{clientCerts},
|
||||
RootCAs: certPool,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func createConnectUDSDialer(udsConfig *apiserver.UDSTransport) (utilnet.DialFunc, error) {
|
||||
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
trace := utiltrace.New("Proxy via HTTP Connect over UDS", utiltrace.Field{Key: "address", Value: addr})
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
start := time.Now()
|
||||
proxyConn, err := net.Dial("unix", udsConfig.UDSName)
|
||||
if err != nil {
|
||||
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS, egressmetrics.StageDial)
|
||||
return nil, fmt.Errorf("dialing proxy %q failed: %v", udsConfig.UDSName, err)
|
||||
}
|
||||
ret, err := tunnelHTTPConnect(proxyConn, udsConfig.UDSName, addr)
|
||||
if err != nil {
|
||||
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS, egressmetrics.StageProxy)
|
||||
return nil, err
|
||||
}
|
||||
egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS)
|
||||
return ret, nil
|
||||
func getProxyAddress(urlString string) (string, error) {
|
||||
proxyURL, err := url.Parse(urlString)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("invalid proxy server url %q: %v", urlString, err)
|
||||
}
|
||||
return contextDialer, nil
|
||||
return proxyURL.Host, nil
|
||||
}
|
||||
|
||||
func createGRPCUDSDialer(udsName string) (utilnet.DialFunc, error) {
|
||||
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
trace := utiltrace.New("Proxy via GRPC over UDS", utiltrace.Field{Key: "address", Value: addr})
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
start := time.Now()
|
||||
dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
|
||||
c, err := net.Dial("unix", udsName)
|
||||
func connectionToDialerCreator(c apiserver.Connection) (*dialerCreator, error) {
|
||||
switch c.ProxyProtocol {
|
||||
|
||||
case apiserver.ProtocolHTTPConnect:
|
||||
if c.Transport.UDS != nil {
|
||||
return &dialerCreator{
|
||||
connector: &udsHTTPConnectConnector{
|
||||
udsName: c.Transport.UDS.UDSName,
|
||||
},
|
||||
options: metricsOptions{
|
||||
transport: egressmetrics.TransportUDS,
|
||||
protocol: egressmetrics.ProtocolHTTPConnect,
|
||||
},
|
||||
}, nil
|
||||
} else if c.Transport.TCP != nil {
|
||||
tlsConfig, err := getTLSConfig(c.Transport.TCP.TLSConfig)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to create connection to uds name %s, error: %v", udsName, err)
|
||||
return nil, err
|
||||
}
|
||||
return c, err
|
||||
})
|
||||
|
||||
tunnel, err := client.CreateGrpcTunnel(udsName, dialOption, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS, egressmetrics.StageDial)
|
||||
return nil, err
|
||||
proxyAddress, err := getProxyAddress(c.Transport.TCP.URL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &dialerCreator{
|
||||
connector: &tcpHTTPConnectConnector{
|
||||
tlsConfig: tlsConfig,
|
||||
proxyAddress: proxyAddress,
|
||||
},
|
||||
options: metricsOptions{
|
||||
transport: egressmetrics.TransportTCP,
|
||||
protocol: egressmetrics.ProtocolHTTPConnect,
|
||||
},
|
||||
}, nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("Either a TCP or UDS transport must be specified")
|
||||
}
|
||||
|
||||
proxyConn, err := tunnel.Dial("tcp", addr)
|
||||
if err != nil {
|
||||
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS, egressmetrics.StageProxy)
|
||||
return nil, err
|
||||
case apiserver.ProtocolGRPC:
|
||||
if c.Transport.UDS != nil {
|
||||
return &dialerCreator{
|
||||
connector: &udsGRPCConnector{
|
||||
udsName: c.Transport.UDS.UDSName,
|
||||
},
|
||||
options: metricsOptions{
|
||||
transport: egressmetrics.TransportUDS,
|
||||
protocol: egressmetrics.ProtocolGRPC,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS)
|
||||
return proxyConn, nil
|
||||
return nil, fmt.Errorf("UDS transport must be specified for GRPC")
|
||||
case apiserver.ProtocolDirect:
|
||||
return &dialerCreator{direct: true}, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized service connection protocol %q", c.ProxyProtocol)
|
||||
}
|
||||
return contextDialer, nil
|
||||
|
||||
}
|
||||
|
||||
// NewEgressSelector configures lookup mechanism for Lookup.
|
||||
@ -247,40 +348,11 @@ func NewEgressSelector(config *apiserver.EgressSelectorConfiguration) (*EgressSe
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch service.Connection.ProxyProtocol {
|
||||
|
||||
case apiserver.ProtocolHTTPConnect:
|
||||
if service.Connection.Transport.UDS != nil {
|
||||
contextDialer, err := createConnectUDSDialer(service.Connection.Transport.UDS)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create HTTPConnect uds dialer: %v", err)
|
||||
}
|
||||
cs.egressToDialer[name] = contextDialer
|
||||
} else if service.Connection.Transport.TCP != nil {
|
||||
contextDialer, err := createConnectTCPDialer(service.Connection.Transport.TCP)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create HTTPConnect dialer: %v", err)
|
||||
}
|
||||
cs.egressToDialer[name] = contextDialer
|
||||
} else {
|
||||
return nil, fmt.Errorf("Either a TCP or UDS transport must be specified")
|
||||
}
|
||||
case apiserver.ProtocolGRPC:
|
||||
if service.Connection.Transport.UDS != nil {
|
||||
grpcContextDialer, err := createGRPCUDSDialer(service.Connection.Transport.UDS.UDSName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create grpc dialer: %v", err)
|
||||
}
|
||||
cs.egressToDialer[name] = grpcContextDialer
|
||||
|
||||
} else {
|
||||
return nil, fmt.Errorf("UDS transport must be specified for GRPC")
|
||||
}
|
||||
case apiserver.ProtocolDirect:
|
||||
cs.egressToDialer[name] = directDialer
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized service connection protocol %q", service.Connection.ProxyProtocol)
|
||||
dialerCreator, err := connectionToDialerCreator(service.Connection)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create dialer for egressSelection %q: %v", name, err)
|
||||
}
|
||||
cs.egressToDialer[name] = dialerCreator.createDialer()
|
||||
}
|
||||
return cs, nil
|
||||
}
|
||||
|
@ -18,12 +18,19 @@ package egressselector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apiserver/pkg/apis/apiserver"
|
||||
"k8s.io/apiserver/pkg/server/egressselector/metrics"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
)
|
||||
|
||||
type fakeEgressSelection struct {
|
||||
@ -163,3 +170,97 @@ func validateDirectDialer(dialer utilnet.DialFunc, s *fakeEgressSelection) (bool
|
||||
}
|
||||
return s.directDialerCalled, nil
|
||||
}
|
||||
|
||||
type fakeProxyServerConnector struct {
|
||||
connectorErr bool
|
||||
proxierErr bool
|
||||
}
|
||||
|
||||
func (f *fakeProxyServerConnector) connect() (proxier, error) {
|
||||
if f.connectorErr {
|
||||
return nil, fmt.Errorf("fake error")
|
||||
}
|
||||
return &fakeProxier{err: f.proxierErr}, nil
|
||||
}
|
||||
|
||||
type fakeProxier struct {
|
||||
err bool
|
||||
}
|
||||
|
||||
func (f *fakeProxier) proxy(_ string) (net.Conn, error) {
|
||||
if f.err {
|
||||
return nil, fmt.Errorf("fake error")
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestMetrics(t *testing.T) {
|
||||
testcases := map[string]struct {
|
||||
connectorErr bool
|
||||
proxierErr bool
|
||||
metrics []string
|
||||
want string
|
||||
}{
|
||||
"connect to proxy server error": {
|
||||
connectorErr: true,
|
||||
proxierErr: false,
|
||||
metrics: []string{"apiserver_egress_dialer_dial_failure_count"},
|
||||
want: `
|
||||
# HELP apiserver_egress_dialer_dial_failure_count [ALPHA] Dial failure count, labeled by the protocol (http-connect or grpc), transport (tcp or uds), and stage (connect or proxy). The stage indicates at which stage the dial failed
|
||||
# TYPE apiserver_egress_dialer_dial_failure_count counter
|
||||
apiserver_egress_dialer_dial_failure_count{protocol="fake_protocol",stage="connect",transport="fake_transport"} 1
|
||||
`,
|
||||
},
|
||||
"connect succeeded, proxy failed": {
|
||||
connectorErr: false,
|
||||
proxierErr: true,
|
||||
metrics: []string{"apiserver_egress_dialer_dial_failure_count"},
|
||||
want: `
|
||||
# HELP apiserver_egress_dialer_dial_failure_count [ALPHA] Dial failure count, labeled by the protocol (http-connect or grpc), transport (tcp or uds), and stage (connect or proxy). The stage indicates at which stage the dial failed
|
||||
# TYPE apiserver_egress_dialer_dial_failure_count counter
|
||||
apiserver_egress_dialer_dial_failure_count{protocol="fake_protocol",stage="proxy",transport="fake_transport"} 1
|
||||
`,
|
||||
},
|
||||
"successful": {
|
||||
connectorErr: false,
|
||||
proxierErr: false,
|
||||
metrics: []string{"apiserver_egress_dialer_dial_duration_seconds"},
|
||||
want: `
|
||||
# HELP apiserver_egress_dialer_dial_duration_seconds [ALPHA] Dial latency histogram in seconds, labeled by the protocol (http-connect or grpc), transport (tcp or uds)
|
||||
# TYPE apiserver_egress_dialer_dial_duration_seconds histogram
|
||||
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="0.005"} 1
|
||||
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="0.025"} 1
|
||||
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="0.1"} 1
|
||||
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="0.5"} 1
|
||||
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="2.5"} 1
|
||||
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="12.5"} 1
|
||||
apiserver_egress_dialer_dial_duration_seconds_bucket{protocol="fake_protocol",transport="fake_transport",le="+Inf"} 1
|
||||
apiserver_egress_dialer_dial_duration_seconds_sum{protocol="fake_protocol",transport="fake_transport"} 0
|
||||
apiserver_egress_dialer_dial_duration_seconds_count{protocol="fake_protocol",transport="fake_transport"} 1
|
||||
`,
|
||||
},
|
||||
}
|
||||
for tn, tc := range testcases {
|
||||
|
||||
t.Run(tn, func(t *testing.T) {
|
||||
metrics.Metrics.Reset()
|
||||
metrics.Metrics.SetClock(clock.NewFakeClock(time.Now()))
|
||||
d := dialerCreator{
|
||||
connector: &fakeProxyServerConnector{
|
||||
connectorErr: tc.connectorErr,
|
||||
proxierErr: tc.proxierErr,
|
||||
},
|
||||
options: metricsOptions{
|
||||
transport: "fake_transport",
|
||||
protocol: "fake_protocol",
|
||||
},
|
||||
}
|
||||
dialer := d.createDialer()
|
||||
dialer(context.TODO(), "", "")
|
||||
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tc.want), tc.metrics...); err != nil {
|
||||
t.Errorf("Err in comparing metrics %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ package metrics
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
)
|
||||
@ -29,22 +30,21 @@ const (
|
||||
|
||||
// ProtocolHTTPConnect means that the proxy protocol is http-connect.
|
||||
ProtocolHTTPConnect = "http_connect"
|
||||
// ProtocolHTTPGRPC means that the proxy protocol is the GRPC protocol.
|
||||
// ProtocolGRPC means that the proxy protocol is the GRPC protocol.
|
||||
ProtocolGRPC = "grpc"
|
||||
// TransportTCP means that the transport is TCP.
|
||||
TransportTCP = "tcp"
|
||||
// TransportUDS means that the transport is UDS.
|
||||
TransportUDS = "uds"
|
||||
// StageTransport indicates that the dial failed at dialing to the proxy server.
|
||||
StageDial = "dial"
|
||||
// StageProtocol indicates that the dial failed at requesting the proxy server to proxy.
|
||||
// StageConnect indicates that the dial failed at establishing connection to the proxy server.
|
||||
StageConnect = "connect"
|
||||
// StageProxy indicates that the dial failed at requesting the proxy server to proxy.
|
||||
StageProxy = "proxy"
|
||||
)
|
||||
|
||||
var (
|
||||
// Use buckets ranging from 5 ms to 12.5 seconds.
|
||||
latencyBuckets = []float64{0.005, 0.025, 0.1, 0.5, 2.5, 12.5}
|
||||
latencySummaryMaxAge = 5 * time.Hour
|
||||
latencyBuckets = []float64{0.005, 0.025, 0.1, 0.5, 2.5, 12.5}
|
||||
|
||||
// Metrics provides access to all dial metrics.
|
||||
Metrics = newDialMetrics()
|
||||
@ -52,6 +52,7 @@ var (
|
||||
|
||||
// DialMetrics instruments dials to proxy server with prometheus metrics.
|
||||
type DialMetrics struct {
|
||||
clock clock.Clock
|
||||
latencies *metrics.HistogramVec
|
||||
failures *metrics.CounterVec
|
||||
}
|
||||
@ -75,7 +76,7 @@ func newDialMetrics() *DialMetrics {
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "dial_failure_count",
|
||||
Help: "Dial failure count, labeled by the protocol (http-connect or grpc), transport (tcp or uds), and stage (dial or proxy). The stage indicates at which stage the dial failed",
|
||||
Help: "Dial failure count, labeled by the protocol (http-connect or grpc), transport (tcp or uds), and stage (connect or proxy). The stage indicates at which stage the dial failed",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"protocol", "transport", "stage"},
|
||||
@ -83,7 +84,17 @@ func newDialMetrics() *DialMetrics {
|
||||
|
||||
legacyregistry.MustRegister(latencies)
|
||||
legacyregistry.MustRegister(failures)
|
||||
return &DialMetrics{latencies: latencies, failures: failures}
|
||||
return &DialMetrics{latencies: latencies, failures: failures, clock: clock.RealClock{}}
|
||||
}
|
||||
|
||||
// Clock returns the clock.
|
||||
func (m *DialMetrics) Clock() clock.Clock {
|
||||
return m.clock
|
||||
}
|
||||
|
||||
// SetClock sets the clock.
|
||||
func (m *DialMetrics) SetClock(c clock.Clock) {
|
||||
m.clock = c
|
||||
}
|
||||
|
||||
// Reset resets the metrics.
|
||||
@ -97,7 +108,7 @@ func (m *DialMetrics) ObserveDialLatency(elapsed time.Duration, protocol, transp
|
||||
m.latencies.WithLabelValues(protocol, transport).Observe(elapsed.Seconds())
|
||||
}
|
||||
|
||||
// ObserverDialFailure records a failed dial, labeled by protocol, transport, and the stage the dial failed at.
|
||||
// ObserveDialFailure records a failed dial, labeled by protocol, transport, and the stage the dial failed at.
|
||||
func (m *DialMetrics) ObserveDialFailure(protocol, transport, stage string) {
|
||||
m.failures.WithLabelValues(protocol, transport, stage).Inc()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user