diff --git a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go index 1fac9d9465a..cdf6a0c50a9 100644 --- a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go @@ -91,7 +91,7 @@ func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error { return fmt.Errorf("socket path is empty") } if _, ok := asw.socketFileToInfo[pluginInfo.SocketPath]; ok { - klog.V(2).Infof("Plugin (Path %s) exists in actual state cache", pluginInfo.SocketPath) + klog.V(2).InfoS("Plugin exists in actual state cache", "path", pluginInfo.SocketPath) } asw.socketFileToInfo[pluginInfo.SocketPath] = pluginInfo return nil diff --git a/pkg/kubelet/pluginmanager/cache/desired_state_of_world.go b/pkg/kubelet/pluginmanager/cache/desired_state_of_world.go index d8f504d0b62..a190e7707a0 100644 --- a/pkg/kubelet/pluginmanager/cache/desired_state_of_world.go +++ b/pkg/kubelet/pluginmanager/cache/desired_state_of_world.go @@ -128,7 +128,7 @@ func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string) error { return fmt.Errorf("socket path is empty") } if _, ok := dsw.socketFileToInfo[socketPath]; ok { - klog.V(2).Infof("Plugin (Path %s) exists in actual state cache, timestamp will be updated", socketPath) + klog.V(2).InfoS("Plugin exists in actual state cache, timestamp will be updated", "path", socketPath) } // Update the PluginInfo object. diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go index bc643e458f2..db6e8d41acb 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go @@ -118,7 +118,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( Name: infoResp.Name, }) if err != nil { - klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err) + klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath) } if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil { return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err)) @@ -147,7 +147,7 @@ func (og *operationGenerator) GenerateUnregisterPluginFunc( pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name) - klog.V(4).Infof("DeRegisterPlugin called for %s on %v", pluginInfo.Name, pluginInfo.Handler) + klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler) return nil } return unregisterPluginFunc diff --git a/pkg/kubelet/pluginmanager/plugin_manager.go b/pkg/kubelet/pluginmanager/plugin_manager.go index 2697fda873f..79e0c8c46b7 100644 --- a/pkg/kubelet/pluginmanager/plugin_manager.go +++ b/pkg/kubelet/pluginmanager/plugin_manager.go @@ -109,14 +109,14 @@ func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan str defer runtime.HandleCrash() pm.desiredStateOfWorldPopulator.Start(stopCh) - klog.V(2).Infof("The desired_state_of_world populator (plugin watcher) starts") + klog.V(2).InfoS("The desired_state_of_world populator (plugin watcher) starts") - klog.Infof("Starting Kubelet Plugin Manager") + klog.InfoS("Starting Kubelet Plugin Manager") go pm.reconciler.Run(stopCh) metrics.Register(pm.actualStateOfWorld, pm.desiredStateOfWorld) <-stopCh - klog.Infof("Shutting down Kubelet Plugin Manager") + klog.InfoS("Shutting down Kubelet Plugin Manager") } func (pm *pluginManager) AddHandler(pluginType string, handler cache.PluginHandler) { diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/example_handler.go b/pkg/kubelet/pluginmanager/pluginwatcher/example_handler.go index 8b8228af867..6978826c9ce 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/example_handler.go +++ b/pkg/kubelet/pluginmanager/pluginwatcher/example_handler.go @@ -121,7 +121,7 @@ func (p *exampleHandler) EventChan(pluginName string) chan examplePluginEvent { } func (p *exampleHandler) SendEvent(pluginName string, event examplePluginEvent) { - klog.V(2).Infof("Sending %v for plugin %s over chan %v", event, pluginName, p.eventChans[pluginName]) + klog.V(2).InfoS("Sending event for plugin", "pluginName", pluginName, "event", event, "channel", p.eventChans[pluginName]) p.eventChans[pluginName] <- event } diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin.go b/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin.go index d0859cc8967..c6be96c9ce4 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin.go +++ b/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin.go @@ -50,7 +50,7 @@ type pluginServiceV1Beta1 struct { } func (s *pluginServiceV1Beta1) GetExampleInfo(ctx context.Context, rqt *v1beta1.ExampleRequest) (*v1beta1.ExampleResponse, error) { - klog.Infof("GetExampleInfo v1beta1field: %s", rqt.V1Beta1Field) + klog.InfoS("GetExampleInfo v1beta1field", "field", rqt.V1Beta1Field) return &v1beta1.ExampleResponse{}, nil } @@ -63,7 +63,7 @@ type pluginServiceV1Beta2 struct { } func (s *pluginServiceV1Beta2) GetExampleInfo(ctx context.Context, rqt *v1beta2.ExampleRequest) (*v1beta2.ExampleResponse, error) { - klog.Infof("GetExampleInfo v1beta2_field: %s", rqt.V1Beta2Field) + klog.InfoS("GetExampleInfo v1beta2_field", "field", rqt.V1Beta2Field) return &v1beta2.ExampleResponse{}, nil } @@ -105,7 +105,7 @@ func (e *examplePlugin) GetInfo(ctx context.Context, req *registerapi.InfoReques } func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) { - klog.Errorf("Registration is: %v\n", status) + klog.InfoS("Notify registration status", "status", status) if e.registrationStatus != nil { e.registrationStatus <- *status @@ -116,13 +116,13 @@ func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *re // Serve starts a pluginwatcher server and one or more of the plugin services func (e *examplePlugin) Serve(services ...string) error { - klog.Infof("starting example server at: %s\n", e.endpoint) + klog.InfoS("Starting example server", "endpoint", e.endpoint) lis, err := net.Listen("unix", e.endpoint) if err != nil { return err } - klog.Infof("example server started at: %s\n", e.endpoint) + klog.InfoS("Example server started", "endpoint", e.endpoint) e.grpcServer = grpc.NewServer() // Registers kubelet plugin watcher api. @@ -147,7 +147,7 @@ func (e *examplePlugin) Serve(services ...string) error { defer e.wg.Done() // Blocking call to accept incoming connections. if err := e.grpcServer.Serve(lis); err != nil { - klog.Errorf("example server stopped serving: %v", err) + klog.ErrorS(err, "Example server stopped serving") } }() @@ -155,7 +155,7 @@ func (e *examplePlugin) Serve(services ...string) error { } func (e *examplePlugin) Stop() error { - klog.Infof("Stopping example server at: %s\n", e.endpoint) + klog.InfoS("Stopping example server", "endpoint", e.endpoint) e.grpcServer.Stop() c := make(chan struct{}) diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go b/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go index 2a869c186a1..b3a1a8b67e4 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go +++ b/pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go @@ -49,7 +49,7 @@ func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) * // Start watches for the creation and deletion of plugin sockets at the path func (w *Watcher) Start(stopCh <-chan struct{}) error { - klog.V(2).Infof("Plugin Watcher Start at %s", w.path) + klog.V(2).InfoS("Plugin Watcher Start", "path", w.path) // Creating the directory to be watched if it doesn't exist yet, // and walks through the directory to discover the existing plugins. @@ -65,7 +65,7 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error { // Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine. if err := w.traversePluginDir(w.path); err != nil { - klog.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err) + klog.ErrorS(err, "Failed to traverse plugin socket path", "path", w.path) } go func(fsWatcher *fsnotify.Watcher) { @@ -76,7 +76,7 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error { if event.Op&fsnotify.Create == fsnotify.Create { err := w.handleCreateEvent(event) if err != nil { - klog.Errorf("error %v when handling create event: %s", err, event) + klog.ErrorS(err, "Error when handling create event", "event", event) } } else if event.Op&fsnotify.Remove == fsnotify.Remove { w.handleDeleteEvent(event) @@ -84,7 +84,7 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error { continue case err := <-fsWatcher.Errors: if err != nil { - klog.Errorf("fsWatcher received error: %v", err) + klog.ErrorS(err, "FsWatcher received error") } continue case <-stopCh: @@ -98,7 +98,7 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error { } func (w *Watcher) init() error { - klog.V(4).Infof("Ensuring Plugin directory at %s ", w.path) + klog.V(4).InfoS("Ensuring Plugin directory", "path", w.path) if err := w.fs.MkdirAll(w.path, 0755); err != nil { return fmt.Errorf("error (re-)creating root %s: %v", w.path, err) @@ -122,7 +122,7 @@ func (w *Watcher) traversePluginDir(dir string) error { return fmt.Errorf("error accessing path: %s error: %v", path, err) } - klog.Errorf("error accessing path: %s error: %v", path, err) + klog.ErrorS(err, "Error accessing path", "path", path) return nil } @@ -143,10 +143,10 @@ func (w *Watcher) traversePluginDir(dir string) error { } //TODO: Handle errors by taking corrective measures if err := w.handleCreateEvent(event); err != nil { - klog.Errorf("error %v when handling create event: %s", err, event) + klog.ErrorS(err, "Error when handling create", "event", event) } default: - klog.V(5).Infof("Ignoring file %s with mode %v", path, mode) + klog.V(5).InfoS("Ignoring file", "path", path, "mode", mode) } return nil @@ -157,7 +157,7 @@ func (w *Watcher) traversePluginDir(dir string) error { // Files names: // - MUST NOT start with a '.' func (w *Watcher) handleCreateEvent(event fsnotify.Event) error { - klog.V(6).Infof("Handling create event: %v", event) + klog.V(6).InfoS("Handling create event", "event", event) fi, err := os.Stat(event.Name) // TODO: This is a workaround for Windows 20H2 issue for os.Stat(). Please see @@ -171,7 +171,7 @@ func (w *Watcher) handleCreateEvent(event fsnotify.Event) error { } if strings.HasPrefix(fi.Name(), ".") { - klog.V(5).Infof("Ignoring file (starts with '.'): %s", fi.Name()) + klog.V(5).InfoS("Ignoring file (starts with '.')", "path", fi.Name()) return nil } @@ -181,7 +181,7 @@ func (w *Watcher) handleCreateEvent(event fsnotify.Event) error { return fmt.Errorf("failed to determine if file: %s is a unix domain socket: %v", event.Name, err) } if !isSocket { - klog.V(5).Infof("Ignoring non socket file %s", fi.Name()) + klog.V(5).InfoS("Ignoring non socket file", "path", fi.Name()) return nil } @@ -200,7 +200,7 @@ func (w *Watcher) handlePluginRegistration(socketPath string) error { // a possibility that it has been deleted and recreated again before it is // removed from the desired world cache, so we still need to call AddOrUpdatePlugin // in this case to update the timestamp - klog.V(2).Infof("Adding socket path or updating timestamp %s to desired state cache", socketPath) + klog.V(2).InfoS("Adding socket path or updating timestamp to desired state cache", "path", socketPath) err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath) if err != nil { return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err) @@ -209,9 +209,9 @@ func (w *Watcher) handlePluginRegistration(socketPath string) error { } func (w *Watcher) handleDeleteEvent(event fsnotify.Event) { - klog.V(6).Infof("Handling delete event: %v", event) + klog.V(6).InfoS("Handling delete event", "event", event) socketPath := event.Name - klog.V(2).Infof("Removing socket path %s from desired state cache", socketPath) + klog.V(2).InfoS("Removing socket path from desired state cache", "path", socketPath) w.desiredStateOfWorld.RemovePlugin(socketPath) } diff --git a/pkg/kubelet/pluginmanager/reconciler/reconciler.go b/pkg/kubelet/pluginmanager/reconciler/reconciler.go index 6cba0045469..d8f104eb809 100644 --- a/pkg/kubelet/pluginmanager/reconciler/reconciler.go +++ b/pkg/kubelet/pluginmanager/reconciler/reconciler.go @@ -122,7 +122,7 @@ func (rc *reconciler) reconcile() { // with the same socket path but different timestamp. for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() { if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.Timestamp != registeredPlugin.Timestamp { - klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("An updated version of plugin has been found, unregistering the plugin first before reregistering", "")) + klog.V(5).InfoS("An updated version of plugin has been found, unregistering the plugin first before reregistering", "plugin", registeredPlugin) unregisterPlugin = true break } @@ -130,17 +130,17 @@ func (rc *reconciler) reconcile() { } if unregisterPlugin { - klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("Starting operationExecutor.UnregisterPlugin", "")) + klog.V(5).InfoS("Starting operationExecutor.UnregisterPlugin", "plugin", registeredPlugin) err := rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld) if err != nil && !goroutinemap.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. - klog.Errorf(registeredPlugin.GenerateErrorDetailed("operationExecutor.UnregisterPlugin failed", err).Error()) + klog.ErrorS(err, "OperationExecutor.UnregisterPlugin failed", "plugin", registeredPlugin) } if err == nil { - klog.V(1).Infof(registeredPlugin.GenerateMsgDetailed("operationExecutor.UnregisterPlugin started", "")) + klog.V(1).InfoS("OperationExecutor.UnregisterPlugin started", "plugin", registeredPlugin) } } } @@ -148,16 +148,16 @@ func (rc *reconciler) reconcile() { // Ensure plugins that should be registered are registered for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() { if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) { - klog.V(5).Infof(pluginToRegister.GenerateMsgDetailed("Starting operationExecutor.RegisterPlugin", "")) + klog.V(5).InfoS("Starting operationExecutor.RegisterPlugin", "plugin", pluginToRegister) err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld) if err != nil && !goroutinemap.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. - klog.Errorf(pluginToRegister.GenerateErrorDetailed("operationExecutor.RegisterPlugin failed", err).Error()) + klog.ErrorS(err, "OperationExecutor.RegisterPlugin failed", "plugin", pluginToRegister) } if err == nil { - klog.V(1).Infof(pluginToRegister.GenerateMsgDetailed("operationExecutor.RegisterPlugin started", "")) + klog.V(1).InfoS("OperationExecutor.RegisterPlugin started", "plugin", pluginToRegister) } } }