diff --git a/pkg/probe/grpc/grpc.go b/pkg/probe/grpc/grpc.go index 4d873acee4c..b7720dd7793 100644 --- a/pkg/probe/grpc/grpc.go +++ b/pkg/probe/grpc/grpc.go @@ -34,7 +34,7 @@ import ( // Prober is an interface that defines the Probe function for doing GRPC readiness/liveness/startup checks. type Prober interface { - Probe(host, service string, port int, timeout time.Duration, opts ...grpc.DialOption) (probe.Result, string, error) + Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error) } type grpcProber struct { @@ -47,14 +47,16 @@ func New() Prober { // Probe executes a grpc call to check the liveness/readiness/startup of container. // Returns the Result status, command output, and errors if any. -// Only return non-nil error when service is unavailable and/or not implementing the interface, -// otherwise result status is failed,BUT err is nil -func (p grpcProber) Probe(host, service string, port int, timeout time.Duration, opts ...grpc.DialOption) (probe.Result, string, error) { +// Any failure is considered as a probe failure to mimic grpc_health_probe tool behavior. +// err is always nil +func (p grpcProber) Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error) { v := version.Get() - md := metadata.New(map[string]string{ - "User-Agent": fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor), - }) + opts := []grpc.DialOption{ + grpc.WithUserAgent(fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor)), + grpc.WithBlock(), + grpc.WithInsecure(), //credentials are currently not supported + } ctx, cancel := context.WithTimeout(context.Background(), timeout) @@ -66,10 +68,10 @@ func (p grpcProber) Probe(host, service string, port int, timeout time.Duration, if err != nil { if err == context.DeadlineExceeded { klog.V(4).ErrorS(err, "failed to connect grpc service due to timeout", "addr", addr, "service", service, "timeout", timeout) - return probe.Failure, fmt.Sprintf("GRPC probe failed to dial: %s", err), nil + return probe.Failure, fmt.Sprintf("timeout: failed to connect service %q within %v: %+v", addr, timeout, err), nil } else { klog.V(4).ErrorS(err, "failed to connect grpc service", "service", addr) - return probe.Failure, "", fmt.Errorf("GRPC probe failed to dial: %w", err) + return probe.Failure, fmt.Sprintf("error: failed to connect service at %q: %+v", addr, err), nil } } @@ -79,20 +81,20 @@ func (p grpcProber) Probe(host, service string, port int, timeout time.Duration, client := grpchealth.NewHealthClient(conn) - resp, err := client.Check(metadata.NewOutgoingContext(ctx, md), &grpchealth.HealthCheckRequest{ + resp, err := client.Check(metadata.NewOutgoingContext(ctx, make(metadata.MD)), &grpchealth.HealthCheckRequest{ Service: service, }) if err != nil { - state, ok := status.FromError(err) + stat, ok := status.FromError(err) if ok { - switch state.Code() { + switch stat.Code() { case codes.Unimplemented: klog.V(4).ErrorS(err, "server does not implement the grpc health protocol (grpc.health.v1.Health)", "addr", addr, "service", service) - return probe.Failure, "", fmt.Errorf("server does not implement the grpc health protocol: %w", err) + return probe.Failure, fmt.Sprintf("error: this server does not implement the grpc health protocol (grpc.health.v1.Health): %s", stat.Message()), nil case codes.DeadlineExceeded: klog.V(4).ErrorS(err, "rpc request not finished within timeout", "addr", addr, "service", service, "timeout", timeout) - return probe.Failure, fmt.Sprintf("GRPC probe failed with DeadlineExceeded"), nil + return probe.Failure, fmt.Sprintf("timeout: health rpc did not complete within %v", timeout), nil default: klog.V(4).ErrorS(err, "rpc probe failed") } @@ -100,12 +102,12 @@ func (p grpcProber) Probe(host, service string, port int, timeout time.Duration, klog.V(4).ErrorS(err, "health rpc probe failed") } - return probe.Failure, "", fmt.Errorf("health rpc probe failed: %w", err) + return probe.Failure, fmt.Sprintf("error: health rpc probe failed: %+v", err), nil } - if resp.Status != grpchealth.HealthCheckResponse_SERVING { - return probe.Failure, fmt.Sprintf("GRPC probe failed with status: %s", resp.Status.String()), nil + if resp.GetStatus() != grpchealth.HealthCheckResponse_SERVING { + return probe.Failure, fmt.Sprintf("service unhealthy (responded with %q)", resp.GetStatus().String()), nil } - return probe.Success, fmt.Sprintf("GRPC probe success"), nil + return probe.Success, fmt.Sprintf("service healthy"), nil } diff --git a/pkg/probe/grpc/grpc_test.go b/pkg/probe/grpc/grpc_test.go index 60b66a0c185..728c7a4c86f 100644 --- a/pkg/probe/grpc/grpc_test.go +++ b/pkg/probe/grpc/grpc_test.go @@ -91,10 +91,10 @@ func (e errorNotServeServerMock) Watch(_ *grpchealth.HealthCheckRequest, stream func TestGrpcProber_Probe(t *testing.T) { t.Run("Should: failed but return nil error because cant find host", func(t *testing.T) { s := New() - p, o, err := s.Probe("", "", 32, time.Second, grpc.WithInsecure(), grpc.WithBlock()) + p, o, err := s.Probe("", "", 32, time.Second) assert.Equal(t, probe.Failure, p) assert.Equal(t, nil, err) - assert.Equal(t, "GRPC probe failed to dial: context deadline exceeded", o) + assert.Equal(t, "timeout: failed to connect service \":32\" within 1s: context deadline exceeded", o) }) t.Run("Should: return nil error because connection closed", func(t *testing.T) { s := New() @@ -109,9 +109,9 @@ func TestGrpcProber_Probe(t *testing.T) { // take some time to wait server boot time.Sleep(2 * time.Second) - p, _, err := s.Probe("127.0.0.1", "", port, time.Second, grpc.WithInsecure()) + p, _, err := s.Probe("127.0.0.1", "", port, time.Second) assert.Equal(t, probe.Failure, p) - assert.NotEqual(t, nil, err) + assert.Equal(t, nil, err) }) t.Run("Should: return nil error because server response not served", func(t *testing.T) { s := New() @@ -125,10 +125,10 @@ func TestGrpcProber_Probe(t *testing.T) { }() // take some time to wait server boot time.Sleep(2 * time.Second) - p, o, err := s.Probe("0.0.0.0", "", port, time.Second, grpc.WithInsecure()) + p, o, err := s.Probe("0.0.0.0", "", port, time.Second) assert.Equal(t, probe.Failure, p) assert.Equal(t, nil, err) - assert.Equal(t, "GRPC probe failed with status: NOT_SERVING", o) + assert.Equal(t, "service unhealthy (responded with \"NOT_SERVING\")", o) }) t.Run("Should: return nil-error because server not response in time", func(t *testing.T) { s := New() @@ -143,10 +143,10 @@ func TestGrpcProber_Probe(t *testing.T) { }() // take some time to wait server boot time.Sleep(2 * time.Second) - p, o, err := s.Probe("0.0.0.0", "", port, time.Second*2, grpc.WithInsecure()) + p, o, err := s.Probe("0.0.0.0", "", port, time.Second*2) assert.Equal(t, probe.Failure, p) assert.Equal(t, nil, err) - assert.Equal(t, "GRPC probe failed with DeadlineExceeded", o) + assert.Equal(t, "timeout: health rpc did not complete within 2s", o) }) t.Run("Should: not return error because check was success", func(t *testing.T) { @@ -162,7 +162,7 @@ func TestGrpcProber_Probe(t *testing.T) { }() // take some time to wait server boot time.Sleep(2 * time.Second) - p, _, err := s.Probe("0.0.0.0", "", port, time.Second*2, grpc.WithInsecure()) + p, _, err := s.Probe("0.0.0.0", "", port, time.Second*2) assert.Equal(t, probe.Success, p) assert.Equal(t, nil, err) }) @@ -179,7 +179,7 @@ func TestGrpcProber_Probe(t *testing.T) { }() // take some time to wait server boot time.Sleep(2 * time.Second) - p, _, err := s.Probe("0.0.0.0", "", port, time.Second*2, grpc.WithInsecure()) + p, _, err := s.Probe("0.0.0.0", "", port, time.Second*2) assert.Equal(t, probe.Success, p) assert.Equal(t, nil, err) }) diff --git a/test/e2e/common/node/container_probe.go b/test/e2e/common/node/container_probe.go index 4f1ea5eecd4..14ec8956c6d 100644 --- a/test/e2e/common/node/container_probe.go +++ b/test/e2e/common/node/container_probe.go @@ -520,7 +520,8 @@ var _ = SIGDescribe("Probing container", func() { Description: A Pod is created with liveness probe on grpc service. Liveness probe on this endpoint will not fail. When liveness probe does not fail then the restart count MUST remain zero. */ ginkgo.It("should *not* be restarted with a GRPC liveness probe [NodeAlphaFeature:GRPCContainerProbe][Feature:GRPCContainerProbe]", func() { - e2eskipper.SkipUnlessFeatureGateEnabled(kubefeatures.GRPCContainerProbe) + // TODO(SergeyKanzhelev): it is unclear when feature gates are not working as expected. + //e2eskipper.SkipUnlessFeatureGateEnabled(kubefeatures.GRPCContainerProbe) livenessProbe := &v1.Probe{ ProbeHandler: v1.ProbeHandler{ @@ -530,6 +531,7 @@ var _ = SIGDescribe("Probing container", func() { }, }, InitialDelaySeconds: probeTestInitialDelaySeconds, + TimeoutSeconds: 5, // default 1s can be pretty aggressive in CI environments with low resources FailureThreshold: 1, } @@ -544,16 +546,17 @@ var _ = SIGDescribe("Probing container", func() { When liveness probe does fail then the restart count should +1. */ ginkgo.It("should be restarted with a GRPC liveness probe [NodeAlphaFeature:GRPCContainerProbe][Feature:GRPCContainerProbe]", func() { - e2eskipper.SkipUnlessFeatureGateEnabled(kubefeatures.GRPCContainerProbe) - service := "etcd_health" + // TODO(SergeyKanzhelev): it is unclear when feature gates are not working as expected. + //e2eskipper.SkipUnlessFeatureGateEnabled(kubefeatures.GRPCContainerProbe) + livenessProbe := &v1.Probe{ ProbeHandler: v1.ProbeHandler{ GRPC: &v1.GRPCAction{ - Port: 2379 + 1, // this port is wrong - Service: &service, + Port: 2333, // this port is wrong }, }, InitialDelaySeconds: probeTestInitialDelaySeconds * 4, + TimeoutSeconds: 5, // default 1s can be pretty aggressive in CI environments with low resources FailureThreshold: 1, } pod := gRPCServerPodSpec(nil, livenessProbe, "etcd") @@ -822,10 +825,12 @@ func gRPCServerPodSpec(readinessProbe, livenessProbe *v1.Probe, containerName st Command: []string{ "/usr/local/bin/etcd", "--listen-client-urls", - etcdURL, + "http://0.0.0.0:2379", //should listen on all addresses "--advertise-client-urls", etcdURL, }, + // 2380 is an automatic peer URL + Ports: []v1.ContainerPort{{ContainerPort: int32(2379)}, {ContainerPort: int32(2380)}}, LivenessProbe: livenessProbe, ReadinessProbe: readinessProbe, },