Merge pull request #58319 from vikaschoudhary16/grpc-conn-error

Automatic merge from submit-queue (batch tested with PRs 58319, 58345). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Call Dial in blocking mode

**What this PR does / why we need it**:
# Tests which uncover underlying problem
On the current master code:
1. comment out stub plugin server start [here](https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cm/deviceplugin/endpoint_test.go#L127-L128)
2. run `go test  -v k8s.io/kubernetes/pkg/kubelet/cm/deviceplugin --run TestNewEndpoint --count 1`. Test succeeds.
3. run `go test  -v k8s.io/kubernetes/pkg/kubelet/cm/deviceplugin --run TestRun --count 1`. Test hangs with:
> listAndWatch ended unexpectedly for device plugin mock with error rpc error: code = Unavailable desc = grpc: the connection is unavailable

`2` does not fail even though it invokes Dial and no listening server is running. It is because currently Dial is not waiting till the connection turns to Ready state. And this case does not invokes any RPC call over the `conn` returned by `Dial`

`3` hangs because this tests involves actual usage of `conn`(client) and there it does not find listening server because we dint start and deliberately stopped in `1`.

`Dial` should be using `WithBlock` option which ensures that `conn` is returned only when connection is in `Ready` state.

After using `WithBlock` in the `Dial`, in this PR, if `1`, `2` and `3` are repeated. Both, `2` and `3`, fails at `Dial`, which is expected behavior. By `fail`, I meant Dial blocks forever if `WithTimeout` is not used or otherwise, a timeout failure.



**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #58281

**Special notes for your reviewer**:

**Release note**:

```release-note
None
```
/area hw-accelerators
/sig node
/cc  @jiayingz @RenaudWasTaken @vishh @ScorpioCPH @sjenning @derekwaynecarr @jeremyeder @lichuqiang @tengqm
This commit is contained in:
Kubernetes Submit Queue 2018-01-16 19:12:28 -08:00 committed by GitHub
commit 43924b1d45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 9 additions and 13 deletions

View File

@ -86,14 +86,11 @@ func (m *Stub) Start() error {
pluginapi.RegisterDevicePluginServer(m.server, m)
go m.server.Serve(sock)
// Wait till grpc server is ready.
for i := 0; i < 10; i++ {
services := m.server.GetServiceInfo()
if len(services) > 0 {
break
}
time.Sleep(1 * time.Second)
_, conn, err := dial(m.socket)
if err != nil {
return err
}
conn.Close()
log.Println("Starting to serve on", m.socket)
return nil
@ -109,7 +106,8 @@ func (m *Stub) Stop() error {
// Register registers the device plugin for the given resourceName with Kubelet.
func (m *Stub) Register(kubeletEndpoint, resourceName string) error {
conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(),
conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithTimeout(10*time.Second),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))

View File

@ -186,9 +186,10 @@ func (e *endpointImpl) stop() {
e.clientConn.Close()
}
// dial establishes the gRPC communication with the registered device plugin.
// dial establishes the gRPC communication with the registered device plugin. https://godoc.org/google.golang.org/grpc#Dial
func dial(unixSocketPath string) (pluginapi.DevicePluginClient, *grpc.ClientConn, error) {
c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(),
c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithTimeout(10*time.Second),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),

View File

@ -497,9 +497,6 @@ type TestResource struct {
func TestPodContainerDeviceAllocation(t *testing.T) {
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
var logLevel string
flag.StringVar(&logLevel, "logLevel", "4", "test")
flag.Lookup("v").Value.Set(logLevel)
res1 := TestResource{
resourceName: "domain1.com/resource1",
resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),