From 4d18aa63cd43ba02cd5c482255831ab8944426b1 Mon Sep 17 00:00:00 2001 From: Renaud Gaubert Date: Sat, 11 Aug 2018 16:52:41 +0200 Subject: [PATCH] Refactor pluginwatcher to use the new API --- .../util/pluginwatcher/plugin_watcher.go | 389 ++++++++++++------ 1 file changed, 257 insertions(+), 132 deletions(-) diff --git a/pkg/kubelet/util/pluginwatcher/plugin_watcher.go b/pkg/kubelet/util/pluginwatcher/plugin_watcher.go index 6db743dd4fb..cbc33e47444 100644 --- a/pkg/kubelet/util/pluginwatcher/plugin_watcher.go +++ b/pkg/kubelet/util/pluginwatcher/plugin_watcher.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "os" + "strings" "sync" "time" @@ -28,43 +29,144 @@ import ( "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" + registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1" utilfs "k8s.io/kubernetes/pkg/util/filesystem" ) -// RegisterCallbackFn is the type of the callback function that handlers will provide -type RegisterCallbackFn func(pluginName string, endpoint string, versions []string, socketPath string) (chan bool, error) - // Watcher is the plugin watcher type Watcher struct { path string - handlers map[string]RegisterCallbackFn stopCh chan interface{} fs utilfs.Filesystem fsWatcher *fsnotify.Watcher wg sync.WaitGroup - mutex sync.Mutex + + mutex sync.Mutex + handlers map[string]PluginHandler + plugins map[string]pathInfo + pluginsPool map[string]map[string]*sync.Mutex // map[pluginType][pluginName] +} + +type pathInfo struct { + pluginType string + pluginName string } // NewWatcher provides a new watcher -func NewWatcher(sockDir string) Watcher { - return Watcher{ - path: sockDir, - handlers: make(map[string]RegisterCallbackFn), - fs: &utilfs.DefaultFs{}, +func NewWatcher(sockDir string) *Watcher { + return &Watcher{ + path: sockDir, + fs: &utilfs.DefaultFs{}, + + handlers: make(map[string]PluginHandler), + plugins: make(map[string]pathInfo), + pluginsPool: make(map[string]map[string]*sync.Mutex), } } -// AddHandler registers a callback to be invoked for a particular type of plugin -func (w *Watcher) AddHandler(pluginType string, handlerCbkFn RegisterCallbackFn) { +func (w *Watcher) AddHandler(pluginType string, handler PluginHandler) { w.mutex.Lock() defer w.mutex.Unlock() - w.handlers[pluginType] = handlerCbkFn + + w.handlers[pluginType] = handler } -// Creates the plugin directory, if it doesn't already exist. -func (w *Watcher) createPluginDir() error { +func (w *Watcher) getHandler(pluginType string) (PluginHandler, bool) { + w.mutex.Lock() + defer w.mutex.Unlock() + + h, ok := w.handlers[pluginType] + return h, ok +} + +// Start watches for the creation of plugin sockets at the path +func (w *Watcher) Start() error { + glog.V(2).Infof("Plugin Watcher Start at %s", w.path) + w.stopCh = make(chan interface{}) + + // Creating the directory to be watched if it doesn't exist yet, + // and walks through the directory to discover the existing plugins. + if err := w.init(); err != nil { + return err + } + + fsWatcher, err := fsnotify.NewWatcher() + if err != nil { + return fmt.Errorf("failed to start plugin fsWatcher, err: %v", err) + } + w.fsWatcher = fsWatcher + + w.wg.Add(1) + go func(fsWatcher *fsnotify.Watcher) { + defer w.wg.Done() + for { + select { + case event := <-fsWatcher.Events: + //TODO: Handle errors by taking corrective measures + + w.wg.Add(1) + go func() { + defer w.wg.Done() + + if event.Op&fsnotify.Create == fsnotify.Create { + err := w.handleCreateEvent(event) + if err != nil { + glog.Errorf("error %v when handling create event: %s", err, event) + } + } else if event.Op&fsnotify.Remove == fsnotify.Remove { + err := w.handleDeleteEvent(event) + if err != nil { + glog.Errorf("error %v when handling delete event: %s", err, event) + } + } + return + }() + continue + case err := <-fsWatcher.Errors: + if err != nil { + glog.Errorf("fsWatcher received error: %v", err) + } + continue + case <-w.stopCh: + return + } + } + }(fsWatcher) + + // Traverse plugin dir after starting the plugin processing goroutine + if err := w.traversePluginDir(w.path); err != nil { + w.Stop() + return fmt.Errorf("failed to traverse plugin socket path, err: %v", err) + } + + return nil +} + +// Stop stops probing the creation of plugin sockets at the path +func (w *Watcher) Stop() error { + close(w.stopCh) + + c := make(chan struct{}) + go func() { + defer close(c) + w.wg.Wait() + }() + + select { + case <-c: + case <-time.After(11 * time.Second): + return fmt.Errorf("timeout on stopping watcher") + } + + w.fsWatcher.Close() + + return nil +} + +func (w *Watcher) init() error { glog.V(4).Infof("Ensuring Plugin directory at %s ", w.path) + if err := w.fs.MkdirAll(w.path, 0755); err != nil { return fmt.Errorf("error (re-)creating root %s: %v", w.path, err) } @@ -91,22 +193,38 @@ func (w *Watcher) traversePluginDir(dir string) error { Op: fsnotify.Create, } }() + default: + glog.V(5).Infof("Ignoring file %s with mode %v", path, mode) } return nil }) } -func (w *Watcher) init() error { - if err := w.createPluginDir(); err != nil { - return err +// Handle filesystem notify event. +func (w *Watcher) handleCreateEvent(event fsnotify.Event) error { + glog.V(6).Infof("Handling create event: %v", event) + + fi, err := os.Stat(event.Name) + if err != nil { + return fmt.Errorf("stat file %s failed: %v", event.Name, err) } - return nil + + if strings.HasPrefix(fi.Name(), ".") { + glog.Errorf("Ignoring file: %s", fi.Name()) + return nil + } + + if !fi.IsDir() { + return w.handlePluginRegistration(event.Name) + } + + return w.traversePluginDir(event.Name) } -func (w *Watcher) registerPlugin(socketPath string) error { +func (w *Watcher) handlePluginRegistration(socketPath string) error { //TODO: Implement rate limiting to mitigate any DOS kind of attacks. - client, conn, err := dial(socketPath) + client, conn, err := dial(socketPath, 10*time.Second) if err != nil { return fmt.Errorf("dial failed at socket %s, err: %v", socketPath, err) } @@ -114,154 +232,161 @@ func (w *Watcher) registerPlugin(socketPath string) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() + infoResp, err := client.GetInfo(ctx, ®isterapi.InfoRequest{}) if err != nil { return fmt.Errorf("failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err) } - return w.invokeRegistrationCallbackAtHandler(ctx, client, infoResp, socketPath) -} - -func (w *Watcher) invokeRegistrationCallbackAtHandler(ctx context.Context, client registerapi.RegistrationClient, infoResp *registerapi.PluginInfo, socketPath string) error { - var handlerCbkFn RegisterCallbackFn - var ok bool - handlerCbkFn, ok = w.handlers[infoResp.Type] + handler, ok := w.handlers[infoResp.Type] if !ok { - errStr := fmt.Sprintf("no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath) - if _, err := client.NotifyRegistrationStatus(ctx, ®isterapi.RegistrationStatus{ - PluginRegistered: false, - Error: errStr, - }); err != nil { - return errors.Wrap(err, errStr) - } - return errors.New(errStr) + return w.notifyPlugin(client, false, fmt.Sprintf("no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)) } - var versions []string - for _, version := range infoResp.SupportedVersions { - versions = append(versions, version) + // ReRegistration: We want to handle multiple plugins registering at the same time with the same name sequentially. + // See the state machine for more information. + // This is done by using a Lock for each plugin with the same name and type + pool := w.getPluginPool(infoResp.Type, infoResp.Name) + + pool.Lock() + defer pool.Unlock() + + if infoResp.Endpoint == "" { + infoResp.Endpoint = socketPath } + // calls handler callback to verify registration request - chanForAckOfNotification, err := handlerCbkFn(infoResp.Name, infoResp.Endpoint, versions, socketPath) - if err != nil { - errStr := fmt.Sprintf("plugin registration failed with err: %v", err) - if _, err := client.NotifyRegistrationStatus(ctx, ®isterapi.RegistrationStatus{ - PluginRegistered: false, - Error: errStr, - }); err != nil { - return errors.Wrap(err, errStr) - } - return errors.New(errStr) + if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil { + return w.notifyPlugin(client, false, fmt.Sprintf("plugin validation failed with err: %v", err)) } - if _, err := client.NotifyRegistrationStatus(ctx, ®isterapi.RegistrationStatus{ - PluginRegistered: true, - }); err != nil { - chanForAckOfNotification <- false + // We add the plugin to the pluginwatcher's map before calling a plugin consumer's Register handle + // so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call. + w.registerPlugin(socketPath, infoResp.Type, infoResp.Name) + + if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint); err != nil { + return w.notifyPlugin(client, false, fmt.Sprintf("plugin registration failed with err: %v", err)) + } + + // Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate + if err := w.notifyPlugin(client, true, ""); err != nil { return fmt.Errorf("failed to send registration status at socket %s, err: %v", socketPath, err) } - chanForAckOfNotification <- true return nil } -// Handle filesystem notify event. -func (w *Watcher) handleFsNotifyEvent(event fsnotify.Event) error { - if event.Op&fsnotify.Create != fsnotify.Create { +func (w *Watcher) handleDeleteEvent(event fsnotify.Event) error { + glog.V(6).Infof("Handling delete event: %v", event) + + plugin, ok := w.getPlugin(event.Name) + if !ok { + return fmt.Errorf("could not find plugin for deleted file %s", event.Name) + } + + // You should not get a Deregister call while registering a plugin + pool := w.getPluginPool(plugin.pluginType, plugin.pluginName) + + pool.Lock() + defer pool.Unlock() + + // ReRegisteration: When waiting for the lock a plugin with the same name (not socketPath) could have registered + // In that case, we don't want to issue a DeRegister call for that plugin + // When ReRegistering, the new plugin will have removed the current mapping (map[socketPath] = plugin) and replaced + // it with it's own socketPath. + if _, ok = w.getPlugin(event.Name); !ok { + glog.V(2).Infof("A newer plugin watcher has been registered for plugin %v, dropping DeRegister call", plugin) return nil } - fi, err := os.Stat(event.Name) - if err != nil { - return fmt.Errorf("stat file %s failed: %v", event.Name, err) + h, ok := w.getHandler(plugin.pluginType) + if !ok { + return fmt.Errorf("could not find handler %s for plugin %s at path %s", plugin.pluginType, plugin.pluginName, event.Name) } - if !fi.IsDir() { - return w.registerPlugin(event.Name) - } - - if err := w.traversePluginDir(event.Name); err != nil { - return fmt.Errorf("failed to traverse plugin path %s, err: %v", event.Name, err) - } + glog.V(2).Infof("DeRegistering plugin %v at path %s", plugin, event.Name) + w.deRegisterPlugin(event.Name, plugin.pluginType, plugin.pluginName) + h.DeRegisterPlugin(plugin.pluginName) return nil } -// Start watches for the creation of plugin sockets at the path -func (w *Watcher) Start() error { - glog.V(2).Infof("Plugin Watcher Start at %s", w.path) - w.stopCh = make(chan interface{}) +func (w *Watcher) registerPlugin(socketPath, pluginType, pluginName string) { + w.mutex.Lock() + defer w.mutex.Unlock() - // Creating the directory to be watched if it doesn't exist yet, - // and walks through the directory to discover the existing plugins. - if err := w.init(); err != nil { - return err - } - - fsWatcher, err := fsnotify.NewWatcher() - if err != nil { - return fmt.Errorf("failed to start plugin fsWatcher, err: %v", err) - } - w.fsWatcher = fsWatcher - - if err := w.traversePluginDir(w.path); err != nil { - fsWatcher.Close() - return fmt.Errorf("failed to traverse plugin socket path, err: %v", err) - } - - w.wg.Add(1) - go func(fsWatcher *fsnotify.Watcher) { - defer w.wg.Done() - for { - select { - case event := <-fsWatcher.Events: - //TODO: Handle errors by taking corrective measures - go func() { - err := w.handleFsNotifyEvent(event) - if err != nil { - glog.Errorf("error %v when handle event: %s", err, event) - } - }() - continue - case err := <-fsWatcher.Errors: - if err != nil { - glog.Errorf("fsWatcher received error: %v", err) - } - continue - case <-w.stopCh: - fsWatcher.Close() - return - } + // Reregistration case, if this plugin is already in the map, remove it + // This will prevent handleDeleteEvent to issue a DeRegister call + for path, info := range w.plugins { + if info.pluginType != pluginType || info.pluginName != pluginName { + continue } - }(fsWatcher) - return nil -} -// Stop stops probing the creation of plugin sockets at the path -func (w *Watcher) Stop() error { - close(w.stopCh) - c := make(chan struct{}) - go func() { - defer close(c) - w.wg.Wait() - }() - select { - case <-c: - case <-time.After(10 * time.Second): - return fmt.Errorf("timeout on stopping watcher") + delete(w.plugins, path) + break + } + + w.plugins[socketPath] = pathInfo{ + pluginType: pluginType, + pluginName: pluginName, } - return nil } -// Cleanup cleans the path by removing sockets -func (w *Watcher) Cleanup() error { - return os.RemoveAll(w.path) +func (w *Watcher) deRegisterPlugin(socketPath, pluginType, pluginName string) { + w.mutex.Lock() + defer w.mutex.Unlock() + + delete(w.plugins, socketPath) + delete(w.pluginsPool[pluginType], pluginName) +} + +func (w *Watcher) getPlugin(socketPath string) (pathInfo, bool) { + w.mutex.Lock() + defer w.mutex.Unlock() + + plugin, ok := w.plugins[socketPath] + return plugin, ok +} + +func (w *Watcher) getPluginPool(pluginType, pluginName string) *sync.Mutex { + w.mutex.Lock() + defer w.mutex.Unlock() + + if _, ok := w.pluginsPool[pluginType]; !ok { + w.pluginsPool[pluginType] = make(map[string]*sync.Mutex) + } + + if _, ok := w.pluginsPool[pluginType][pluginName]; !ok { + w.pluginsPool[pluginType][pluginName] = &sync.Mutex{} + } + + return w.pluginsPool[pluginType][pluginName] +} + +func (w *Watcher) notifyPlugin(client registerapi.RegistrationClient, registered bool, errStr string) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + status := ®isterapi.RegistrationStatus{ + PluginRegistered: registered, + Error: errStr, + } + + if _, err := client.NotifyRegistrationStatus(ctx, status); err != nil { + return errors.Wrap(err, errStr) + } + + if errStr != "" { + return errors.New(errStr) + } + + return nil } // Dial establishes the gRPC communication with the picked up plugin socket. https://godoc.org/google.golang.org/grpc#Dial -func dial(unixSocketPath string) (registerapi.RegistrationClient, *grpc.ClientConn, error) { +func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) { c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(), - grpc.WithTimeout(10*time.Second), + grpc.WithTimeout(timeout), grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { return net.DialTimeout("unix", addr, timeout) }),