Merge pull request #52747 from RenaudWasTaken/connexion-closed

Automatic merge from submit-queue (batch tested with PRs 52747, 54329). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Device Plugin Endpoints correctly closes client connexion

**What this PR does / why we need it**:
Endpoints in the device plugin may unexpectedly end. Currently the connexion will not be properly closed.
This commit aims to fix this
Related issues #51993

**Special notes for your reviewer**: @jiayingz @mindprince @vishh 

**Release note**:
```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-10-20 20:41:06 -07:00 committed by GitHub
commit c1925d4944
2 changed files with 15 additions and 19 deletions

View File

@ -33,7 +33,8 @@ import (
// for managing gRPC communications with the device plugin and caching // for managing gRPC communications with the device plugin and caching
// device states reported by the device plugin. // device states reported by the device plugin.
type endpoint struct { type endpoint struct {
client pluginapi.DevicePluginClient client pluginapi.DevicePluginClient
clientConn *grpc.ClientConn
socketPath string socketPath string
resourceName string resourceName string
@ -42,32 +43,25 @@ type endpoint struct {
mutex sync.Mutex mutex sync.Mutex
callback MonitorCallback callback MonitorCallback
cancel context.CancelFunc
ctx context.Context
} }
// newEndpoint creates a new endpoint for the given resourceName. // newEndpoint creates a new endpoint for the given resourceName.
func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*endpoint, error) { func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*endpoint, error) {
client, err := dial(socketPath) client, c, err := dial(socketPath)
if err != nil { if err != nil {
glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err) glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
return nil, err return nil, err
} }
ctx, stop := context.WithCancel(context.Background())
return &endpoint{ return &endpoint{
client: client, client: client,
clientConn: c,
socketPath: socketPath, socketPath: socketPath,
resourceName: resourceName, resourceName: resourceName,
devices: nil, devices: nil,
callback: callback, callback: callback,
cancel: stop,
ctx: ctx,
}, nil }, nil
} }
@ -86,11 +80,9 @@ func (e *endpoint) getDevices() []pluginapi.Device {
// list initializes ListAndWatch gRPC call for the device plugin and gets the // list initializes ListAndWatch gRPC call for the device plugin and gets the
// initial list of the devices. Returns ListAndWatch gRPC stream on success. // initial list of the devices. Returns ListAndWatch gRPC stream on success.
func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) { func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) {
glog.V(3).Infof("Starting List") stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
stream, err := e.client.ListAndWatch(e.ctx, &pluginapi.Empty{})
if err != nil { if err != nil {
glog.Errorf(errListAndWatch, e.resourceName, err) glog.Errorf(errListAndWatch, e.resourceName, err)
return nil, err return nil, err
} }
@ -190,7 +182,6 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient
e.callback(e.resourceName, added, updated, deleted) e.callback(e.resourceName, added, updated, deleted)
} }
} }
// allocate issues Allocate gRPC call to the device plugin. // allocate issues Allocate gRPC call to the device plugin.
@ -201,11 +192,11 @@ func (e *endpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error)
} }
func (e *endpoint) stop() { func (e *endpoint) stop() {
e.cancel() e.clientConn.Close()
} }
// dial establishes the gRPC communication with the registered device plugin. // dial establishes the gRPC communication with the registered device plugin.
func dial(unixSocketPath string) (pluginapi.DevicePluginClient, error) { func dial(unixSocketPath string) (pluginapi.DevicePluginClient, *grpc.ClientConn, error) {
c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout) return net.DialTimeout("unix", addr, timeout)
@ -213,8 +204,8 @@ func dial(unixSocketPath string) (pluginapi.DevicePluginClient, error) {
) )
if err != nil { if err != nil {
return nil, fmt.Errorf(errFailedToDialDevicePlugin+" %v", err) return nil, nil, fmt.Errorf(errFailedToDialDevicePlugin+" %v", err)
} }
return pluginapi.NewDevicePluginClient(c), nil return pluginapi.NewDevicePluginClient(c), c, nil
} }

View File

@ -208,9 +208,11 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
return return
} }
stream, err := e.list() stream, err := e.list()
if err != nil { if err != nil {
glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err) glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err)
e.stop()
return return
} }
@ -221,12 +223,14 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
m.endpoints[r.ResourceName] = e m.endpoints[r.ResourceName] = e
m.mutex.Unlock() m.mutex.Unlock()
glog.V(2).Infof("Registered endpoint %v", e) glog.V(2).Infof("Registered endpoint %v", e)
if ok && old != nil { if ok && old != nil {
old.stop() old.stop()
} }
go func() { go func() {
e.listAndWatch(stream) e.listAndWatch(stream)
e.stop()
m.mutex.Lock() m.mutex.Lock()
if old, ok := m.endpoints[r.ResourceName]; ok && old == e { if old, ok := m.endpoints[r.ResourceName]; ok && old == e {
@ -235,6 +239,7 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
// Issues callback to delete all of devices. // Issues callback to delete all of devices.
e.callback(e.resourceName, []pluginapi.Device{}, []pluginapi.Device{}, e.getDevices()) e.callback(e.resourceName, []pluginapi.Device{}, []pluginapi.Device{}, e.getDevices())
} }
glog.V(2).Infof("Unregistered endpoint %v", e) glog.V(2).Infof("Unregistered endpoint %v", e)
m.mutex.Unlock() m.mutex.Unlock()
}() }()