fix the grpc probes

This commit is contained in:
Sergey Kanzhelev 2021-11-19 20:28:31 +00:00
parent c82a0f8ddc
commit f390d49e24
3 changed files with 41 additions and 34 deletions

View File

@ -34,7 +34,7 @@ import (
// Prober is an interface that defines the Probe function for doing GRPC readiness/liveness/startup checks. // Prober is an interface that defines the Probe function for doing GRPC readiness/liveness/startup checks.
type Prober interface { type Prober interface {
Probe(host, service string, port int, timeout time.Duration, opts ...grpc.DialOption) (probe.Result, string, error) Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error)
} }
type grpcProber struct { type grpcProber struct {
@ -47,14 +47,16 @@ func New() Prober {
// Probe executes a grpc call to check the liveness/readiness/startup of container. // Probe executes a grpc call to check the liveness/readiness/startup of container.
// Returns the Result status, command output, and errors if any. // Returns the Result status, command output, and errors if any.
// Only return non-nil error when service is unavailable and/or not implementing the interface, // Any failure is considered as a probe failure to mimic grpc_health_probe tool behavior.
// otherwise result status is failed,BUT err is nil // err is always nil
func (p grpcProber) Probe(host, service string, port int, timeout time.Duration, opts ...grpc.DialOption) (probe.Result, string, error) { func (p grpcProber) Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error) {
v := version.Get() v := version.Get()
md := metadata.New(map[string]string{ opts := []grpc.DialOption{
"User-Agent": fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor), grpc.WithUserAgent(fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor)),
}) grpc.WithBlock(),
grpc.WithInsecure(), //credentials are currently not supported
}
ctx, cancel := context.WithTimeout(context.Background(), timeout) ctx, cancel := context.WithTimeout(context.Background(), timeout)
@ -66,10 +68,10 @@ func (p grpcProber) Probe(host, service string, port int, timeout time.Duration,
if err != nil { if err != nil {
if err == context.DeadlineExceeded { if err == context.DeadlineExceeded {
klog.V(4).ErrorS(err, "failed to connect grpc service due to timeout", "addr", addr, "service", service, "timeout", timeout) klog.V(4).ErrorS(err, "failed to connect grpc service due to timeout", "addr", addr, "service", service, "timeout", timeout)
return probe.Failure, fmt.Sprintf("GRPC probe failed to dial: %s", err), nil return probe.Failure, fmt.Sprintf("timeout: failed to connect service %q within %v: %+v", addr, timeout, err), nil
} else { } else {
klog.V(4).ErrorS(err, "failed to connect grpc service", "service", addr) klog.V(4).ErrorS(err, "failed to connect grpc service", "service", addr)
return probe.Failure, "", fmt.Errorf("GRPC probe failed to dial: %w", err) return probe.Failure, fmt.Sprintf("error: failed to connect service at %q: %+v", addr, err), nil
} }
} }
@ -79,20 +81,20 @@ func (p grpcProber) Probe(host, service string, port int, timeout time.Duration,
client := grpchealth.NewHealthClient(conn) client := grpchealth.NewHealthClient(conn)
resp, err := client.Check(metadata.NewOutgoingContext(ctx, md), &grpchealth.HealthCheckRequest{ resp, err := client.Check(metadata.NewOutgoingContext(ctx, make(metadata.MD)), &grpchealth.HealthCheckRequest{
Service: service, Service: service,
}) })
if err != nil { if err != nil {
state, ok := status.FromError(err) stat, ok := status.FromError(err)
if ok { if ok {
switch state.Code() { switch stat.Code() {
case codes.Unimplemented: case codes.Unimplemented:
klog.V(4).ErrorS(err, "server does not implement the grpc health protocol (grpc.health.v1.Health)", "addr", addr, "service", service) klog.V(4).ErrorS(err, "server does not implement the grpc health protocol (grpc.health.v1.Health)", "addr", addr, "service", service)
return probe.Failure, "", fmt.Errorf("server does not implement the grpc health protocol: %w", err) return probe.Failure, fmt.Sprintf("error: this server does not implement the grpc health protocol (grpc.health.v1.Health): %s", stat.Message()), nil
case codes.DeadlineExceeded: case codes.DeadlineExceeded:
klog.V(4).ErrorS(err, "rpc request not finished within timeout", "addr", addr, "service", service, "timeout", timeout) klog.V(4).ErrorS(err, "rpc request not finished within timeout", "addr", addr, "service", service, "timeout", timeout)
return probe.Failure, fmt.Sprintf("GRPC probe failed with DeadlineExceeded"), nil return probe.Failure, fmt.Sprintf("timeout: health rpc did not complete within %v", timeout), nil
default: default:
klog.V(4).ErrorS(err, "rpc probe failed") klog.V(4).ErrorS(err, "rpc probe failed")
} }
@ -100,12 +102,12 @@ func (p grpcProber) Probe(host, service string, port int, timeout time.Duration,
klog.V(4).ErrorS(err, "health rpc probe failed") klog.V(4).ErrorS(err, "health rpc probe failed")
} }
return probe.Failure, "", fmt.Errorf("health rpc probe failed: %w", err) return probe.Failure, fmt.Sprintf("error: health rpc probe failed: %+v", err), nil
} }
if resp.Status != grpchealth.HealthCheckResponse_SERVING { if resp.GetStatus() != grpchealth.HealthCheckResponse_SERVING {
return probe.Failure, fmt.Sprintf("GRPC probe failed with status: %s", resp.Status.String()), nil return probe.Failure, fmt.Sprintf("service unhealthy (responded with %q)", resp.GetStatus().String()), nil
} }
return probe.Success, fmt.Sprintf("GRPC probe success"), nil return probe.Success, fmt.Sprintf("service healthy"), nil
} }

View File

@ -91,10 +91,10 @@ func (e errorNotServeServerMock) Watch(_ *grpchealth.HealthCheckRequest, stream
func TestGrpcProber_Probe(t *testing.T) { func TestGrpcProber_Probe(t *testing.T) {
t.Run("Should: failed but return nil error because cant find host", func(t *testing.T) { t.Run("Should: failed but return nil error because cant find host", func(t *testing.T) {
s := New() s := New()
p, o, err := s.Probe("", "", 32, time.Second, grpc.WithInsecure(), grpc.WithBlock()) p, o, err := s.Probe("", "", 32, time.Second)
assert.Equal(t, probe.Failure, p) assert.Equal(t, probe.Failure, p)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
assert.Equal(t, "GRPC probe failed to dial: context deadline exceeded", o) assert.Equal(t, "timeout: failed to connect service \":32\" within 1s: context deadline exceeded", o)
}) })
t.Run("Should: return nil error because connection closed", func(t *testing.T) { t.Run("Should: return nil error because connection closed", func(t *testing.T) {
s := New() s := New()
@ -109,9 +109,9 @@ func TestGrpcProber_Probe(t *testing.T) {
// take some time to wait server boot // take some time to wait server boot
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
p, _, err := s.Probe("127.0.0.1", "", port, time.Second, grpc.WithInsecure()) p, _, err := s.Probe("127.0.0.1", "", port, time.Second)
assert.Equal(t, probe.Failure, p) assert.Equal(t, probe.Failure, p)
assert.NotEqual(t, nil, err) assert.Equal(t, nil, err)
}) })
t.Run("Should: return nil error because server response not served", func(t *testing.T) { t.Run("Should: return nil error because server response not served", func(t *testing.T) {
s := New() s := New()
@ -125,10 +125,10 @@ func TestGrpcProber_Probe(t *testing.T) {
}() }()
// take some time to wait server boot // take some time to wait server boot
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
p, o, err := s.Probe("0.0.0.0", "", port, time.Second, grpc.WithInsecure()) p, o, err := s.Probe("0.0.0.0", "", port, time.Second)
assert.Equal(t, probe.Failure, p) assert.Equal(t, probe.Failure, p)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
assert.Equal(t, "GRPC probe failed with status: NOT_SERVING", o) assert.Equal(t, "service unhealthy (responded with \"NOT_SERVING\")", o)
}) })
t.Run("Should: return nil-error because server not response in time", func(t *testing.T) { t.Run("Should: return nil-error because server not response in time", func(t *testing.T) {
s := New() s := New()
@ -143,10 +143,10 @@ func TestGrpcProber_Probe(t *testing.T) {
}() }()
// take some time to wait server boot // take some time to wait server boot
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
p, o, err := s.Probe("0.0.0.0", "", port, time.Second*2, grpc.WithInsecure()) p, o, err := s.Probe("0.0.0.0", "", port, time.Second*2)
assert.Equal(t, probe.Failure, p) assert.Equal(t, probe.Failure, p)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
assert.Equal(t, "GRPC probe failed with DeadlineExceeded", o) assert.Equal(t, "timeout: health rpc did not complete within 2s", o)
}) })
t.Run("Should: not return error because check was success", func(t *testing.T) { t.Run("Should: not return error because check was success", func(t *testing.T) {
@ -162,7 +162,7 @@ func TestGrpcProber_Probe(t *testing.T) {
}() }()
// take some time to wait server boot // take some time to wait server boot
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
p, _, err := s.Probe("0.0.0.0", "", port, time.Second*2, grpc.WithInsecure()) p, _, err := s.Probe("0.0.0.0", "", port, time.Second*2)
assert.Equal(t, probe.Success, p) assert.Equal(t, probe.Success, p)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
}) })
@ -179,7 +179,7 @@ func TestGrpcProber_Probe(t *testing.T) {
}() }()
// take some time to wait server boot // take some time to wait server boot
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
p, _, err := s.Probe("0.0.0.0", "", port, time.Second*2, grpc.WithInsecure()) p, _, err := s.Probe("0.0.0.0", "", port, time.Second*2)
assert.Equal(t, probe.Success, p) assert.Equal(t, probe.Success, p)
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
}) })

View File

@ -520,7 +520,8 @@ var _ = SIGDescribe("Probing container", func() {
Description: A Pod is created with liveness probe on grpc service. Liveness probe on this endpoint will not fail. When liveness probe does not fail then the restart count MUST remain zero. Description: A Pod is created with liveness probe on grpc service. Liveness probe on this endpoint will not fail. When liveness probe does not fail then the restart count MUST remain zero.
*/ */
ginkgo.It("should *not* be restarted with a GRPC liveness probe [NodeAlphaFeature:GRPCContainerProbe][Feature:GRPCContainerProbe]", func() { ginkgo.It("should *not* be restarted with a GRPC liveness probe [NodeAlphaFeature:GRPCContainerProbe][Feature:GRPCContainerProbe]", func() {
e2eskipper.SkipUnlessFeatureGateEnabled(kubefeatures.GRPCContainerProbe) // TODO(SergeyKanzhelev): it is unclear when feature gates are not working as expected.
//e2eskipper.SkipUnlessFeatureGateEnabled(kubefeatures.GRPCContainerProbe)
livenessProbe := &v1.Probe{ livenessProbe := &v1.Probe{
ProbeHandler: v1.ProbeHandler{ ProbeHandler: v1.ProbeHandler{
@ -530,6 +531,7 @@ var _ = SIGDescribe("Probing container", func() {
}, },
}, },
InitialDelaySeconds: probeTestInitialDelaySeconds, InitialDelaySeconds: probeTestInitialDelaySeconds,
TimeoutSeconds: 5, // default 1s can be pretty aggressive in CI environments with low resources
FailureThreshold: 1, FailureThreshold: 1,
} }
@ -544,16 +546,17 @@ var _ = SIGDescribe("Probing container", func() {
When liveness probe does fail then the restart count should +1. When liveness probe does fail then the restart count should +1.
*/ */
ginkgo.It("should be restarted with a GRPC liveness probe [NodeAlphaFeature:GRPCContainerProbe][Feature:GRPCContainerProbe]", func() { ginkgo.It("should be restarted with a GRPC liveness probe [NodeAlphaFeature:GRPCContainerProbe][Feature:GRPCContainerProbe]", func() {
e2eskipper.SkipUnlessFeatureGateEnabled(kubefeatures.GRPCContainerProbe) // TODO(SergeyKanzhelev): it is unclear when feature gates are not working as expected.
service := "etcd_health" //e2eskipper.SkipUnlessFeatureGateEnabled(kubefeatures.GRPCContainerProbe)
livenessProbe := &v1.Probe{ livenessProbe := &v1.Probe{
ProbeHandler: v1.ProbeHandler{ ProbeHandler: v1.ProbeHandler{
GRPC: &v1.GRPCAction{ GRPC: &v1.GRPCAction{
Port: 2379 + 1, // this port is wrong Port: 2333, // this port is wrong
Service: &service,
}, },
}, },
InitialDelaySeconds: probeTestInitialDelaySeconds * 4, InitialDelaySeconds: probeTestInitialDelaySeconds * 4,
TimeoutSeconds: 5, // default 1s can be pretty aggressive in CI environments with low resources
FailureThreshold: 1, FailureThreshold: 1,
} }
pod := gRPCServerPodSpec(nil, livenessProbe, "etcd") pod := gRPCServerPodSpec(nil, livenessProbe, "etcd")
@ -822,10 +825,12 @@ func gRPCServerPodSpec(readinessProbe, livenessProbe *v1.Probe, containerName st
Command: []string{ Command: []string{
"/usr/local/bin/etcd", "/usr/local/bin/etcd",
"--listen-client-urls", "--listen-client-urls",
etcdURL, "http://0.0.0.0:2379", //should listen on all addresses
"--advertise-client-urls", "--advertise-client-urls",
etcdURL, etcdURL,
}, },
// 2380 is an automatic peer URL
Ports: []v1.ContainerPort{{ContainerPort: int32(2379)}, {ContainerPort: int32(2380)}},
LivenessProbe: livenessProbe, LivenessProbe: livenessProbe,
ReadinessProbe: readinessProbe, ReadinessProbe: readinessProbe,
}, },