migrate pkg/kubelet/pluginmanager to structured logging

This commit is contained in:
qingwave 2021-03-06 14:06:39 +08:00
parent e95a8c878a
commit f405c797ff
8 changed files with 36 additions and 36 deletions

View File

@ -91,7 +91,7 @@ func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error {
return fmt.Errorf("socket path is empty")
}
if _, ok := asw.socketFileToInfo[pluginInfo.SocketPath]; ok {
klog.V(2).Infof("Plugin (Path %s) exists in actual state cache", pluginInfo.SocketPath)
klog.V(2).InfoS("Plugin exists in actual state cache", "path", pluginInfo.SocketPath)
}
asw.socketFileToInfo[pluginInfo.SocketPath] = pluginInfo
return nil

View File

@ -128,7 +128,7 @@ func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string) error {
return fmt.Errorf("socket path is empty")
}
if _, ok := dsw.socketFileToInfo[socketPath]; ok {
klog.V(2).Infof("Plugin (Path %s) exists in actual state cache, timestamp will be updated", socketPath)
klog.V(2).InfoS("Plugin exists in actual state cache, timestamp will be updated", "path", socketPath)
}
// Update the PluginInfo object.

View File

@ -118,7 +118,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
Name: infoResp.Name,
})
if err != nil {
klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err)
klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath)
}
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
@ -147,7 +147,7 @@ func (og *operationGenerator) GenerateUnregisterPluginFunc(
pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name)
klog.V(4).Infof("DeRegisterPlugin called for %s on %v", pluginInfo.Name, pluginInfo.Handler)
klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler)
return nil
}
return unregisterPluginFunc

View File

@ -109,14 +109,14 @@ func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan str
defer runtime.HandleCrash()
pm.desiredStateOfWorldPopulator.Start(stopCh)
klog.V(2).Infof("The desired_state_of_world populator (plugin watcher) starts")
klog.V(2).InfoS("The desired_state_of_world populator (plugin watcher) starts")
klog.Infof("Starting Kubelet Plugin Manager")
klog.InfoS("Starting Kubelet Plugin Manager")
go pm.reconciler.Run(stopCh)
metrics.Register(pm.actualStateOfWorld, pm.desiredStateOfWorld)
<-stopCh
klog.Infof("Shutting down Kubelet Plugin Manager")
klog.InfoS("Shutting down Kubelet Plugin Manager")
}
func (pm *pluginManager) AddHandler(pluginType string, handler cache.PluginHandler) {

View File

@ -121,7 +121,7 @@ func (p *exampleHandler) EventChan(pluginName string) chan examplePluginEvent {
}
func (p *exampleHandler) SendEvent(pluginName string, event examplePluginEvent) {
klog.V(2).Infof("Sending %v for plugin %s over chan %v", event, pluginName, p.eventChans[pluginName])
klog.V(2).InfoS("Sending event for plugin", "pluginName", pluginName, "event", event, "channel", p.eventChans[pluginName])
p.eventChans[pluginName] <- event
}

View File

@ -50,7 +50,7 @@ type pluginServiceV1Beta1 struct {
}
func (s *pluginServiceV1Beta1) GetExampleInfo(ctx context.Context, rqt *v1beta1.ExampleRequest) (*v1beta1.ExampleResponse, error) {
klog.Infof("GetExampleInfo v1beta1field: %s", rqt.V1Beta1Field)
klog.InfoS("GetExampleInfo v1beta1field", "field", rqt.V1Beta1Field)
return &v1beta1.ExampleResponse{}, nil
}
@ -63,7 +63,7 @@ type pluginServiceV1Beta2 struct {
}
func (s *pluginServiceV1Beta2) GetExampleInfo(ctx context.Context, rqt *v1beta2.ExampleRequest) (*v1beta2.ExampleResponse, error) {
klog.Infof("GetExampleInfo v1beta2_field: %s", rqt.V1Beta2Field)
klog.InfoS("GetExampleInfo v1beta2_field", "field", rqt.V1Beta2Field)
return &v1beta2.ExampleResponse{}, nil
}
@ -105,7 +105,7 @@ func (e *examplePlugin) GetInfo(ctx context.Context, req *registerapi.InfoReques
}
func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) {
klog.Errorf("Registration is: %v\n", status)
klog.InfoS("Notify registration status", "status", status)
if e.registrationStatus != nil {
e.registrationStatus <- *status
@ -116,13 +116,13 @@ func (e *examplePlugin) NotifyRegistrationStatus(ctx context.Context, status *re
// Serve starts a pluginwatcher server and one or more of the plugin services
func (e *examplePlugin) Serve(services ...string) error {
klog.Infof("starting example server at: %s\n", e.endpoint)
klog.InfoS("Starting example server", "endpoint", e.endpoint)
lis, err := net.Listen("unix", e.endpoint)
if err != nil {
return err
}
klog.Infof("example server started at: %s\n", e.endpoint)
klog.InfoS("Example server started", "endpoint", e.endpoint)
e.grpcServer = grpc.NewServer()
// Registers kubelet plugin watcher api.
@ -147,7 +147,7 @@ func (e *examplePlugin) Serve(services ...string) error {
defer e.wg.Done()
// Blocking call to accept incoming connections.
if err := e.grpcServer.Serve(lis); err != nil {
klog.Errorf("example server stopped serving: %v", err)
klog.ErrorS(err, "Example server stopped serving")
}
}()
@ -155,7 +155,7 @@ func (e *examplePlugin) Serve(services ...string) error {
}
func (e *examplePlugin) Stop() error {
klog.Infof("Stopping example server at: %s\n", e.endpoint)
klog.InfoS("Stopping example server", "endpoint", e.endpoint)
e.grpcServer.Stop()
c := make(chan struct{})

View File

@ -49,7 +49,7 @@ func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *
// Start watches for the creation and deletion of plugin sockets at the path
func (w *Watcher) Start(stopCh <-chan struct{}) error {
klog.V(2).Infof("Plugin Watcher Start at %s", w.path)
klog.V(2).InfoS("Plugin Watcher Start", "path", w.path)
// Creating the directory to be watched if it doesn't exist yet,
// and walks through the directory to discover the existing plugins.
@ -65,7 +65,7 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
// Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine.
if err := w.traversePluginDir(w.path); err != nil {
klog.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
klog.ErrorS(err, "Failed to traverse plugin socket path", "path", w.path)
}
go func(fsWatcher *fsnotify.Watcher) {
@ -76,7 +76,7 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
if event.Op&fsnotify.Create == fsnotify.Create {
err := w.handleCreateEvent(event)
if err != nil {
klog.Errorf("error %v when handling create event: %s", err, event)
klog.ErrorS(err, "Error when handling create event", "event", event)
}
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
w.handleDeleteEvent(event)
@ -84,7 +84,7 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
continue
case err := <-fsWatcher.Errors:
if err != nil {
klog.Errorf("fsWatcher received error: %v", err)
klog.ErrorS(err, "FsWatcher received error")
}
continue
case <-stopCh:
@ -98,7 +98,7 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
}
func (w *Watcher) init() error {
klog.V(4).Infof("Ensuring Plugin directory at %s ", w.path)
klog.V(4).InfoS("Ensuring Plugin directory", "path", w.path)
if err := w.fs.MkdirAll(w.path, 0755); err != nil {
return fmt.Errorf("error (re-)creating root %s: %v", w.path, err)
@ -122,7 +122,7 @@ func (w *Watcher) traversePluginDir(dir string) error {
return fmt.Errorf("error accessing path: %s error: %v", path, err)
}
klog.Errorf("error accessing path: %s error: %v", path, err)
klog.ErrorS(err, "Error accessing path", "path", path)
return nil
}
@ -143,10 +143,10 @@ func (w *Watcher) traversePluginDir(dir string) error {
}
//TODO: Handle errors by taking corrective measures
if err := w.handleCreateEvent(event); err != nil {
klog.Errorf("error %v when handling create event: %s", err, event)
klog.ErrorS(err, "Error when handling create", "event", event)
}
default:
klog.V(5).Infof("Ignoring file %s with mode %v", path, mode)
klog.V(5).InfoS("Ignoring file", "path", path, "mode", mode)
}
return nil
@ -157,7 +157,7 @@ func (w *Watcher) traversePluginDir(dir string) error {
// Files names:
// - MUST NOT start with a '.'
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
klog.V(6).Infof("Handling create event: %v", event)
klog.V(6).InfoS("Handling create event", "event", event)
fi, err := os.Stat(event.Name)
// TODO: This is a workaround for Windows 20H2 issue for os.Stat(). Please see
@ -171,7 +171,7 @@ func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
}
if strings.HasPrefix(fi.Name(), ".") {
klog.V(5).Infof("Ignoring file (starts with '.'): %s", fi.Name())
klog.V(5).InfoS("Ignoring file (starts with '.')", "path", fi.Name())
return nil
}
@ -181,7 +181,7 @@ func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
return fmt.Errorf("failed to determine if file: %s is a unix domain socket: %v", event.Name, err)
}
if !isSocket {
klog.V(5).Infof("Ignoring non socket file %s", fi.Name())
klog.V(5).InfoS("Ignoring non socket file", "path", fi.Name())
return nil
}
@ -200,7 +200,7 @@ func (w *Watcher) handlePluginRegistration(socketPath string) error {
// a possibility that it has been deleted and recreated again before it is
// removed from the desired world cache, so we still need to call AddOrUpdatePlugin
// in this case to update the timestamp
klog.V(2).Infof("Adding socket path or updating timestamp %s to desired state cache", socketPath)
klog.V(2).InfoS("Adding socket path or updating timestamp to desired state cache", "path", socketPath)
err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
if err != nil {
return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err)
@ -209,9 +209,9 @@ func (w *Watcher) handlePluginRegistration(socketPath string) error {
}
func (w *Watcher) handleDeleteEvent(event fsnotify.Event) {
klog.V(6).Infof("Handling delete event: %v", event)
klog.V(6).InfoS("Handling delete event", "event", event)
socketPath := event.Name
klog.V(2).Infof("Removing socket path %s from desired state cache", socketPath)
klog.V(2).InfoS("Removing socket path from desired state cache", "path", socketPath)
w.desiredStateOfWorld.RemovePlugin(socketPath)
}

View File

@ -122,7 +122,7 @@ func (rc *reconciler) reconcile() {
// with the same socket path but different timestamp.
for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {
if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.Timestamp != registeredPlugin.Timestamp {
klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("An updated version of plugin has been found, unregistering the plugin first before reregistering", ""))
klog.V(5).InfoS("An updated version of plugin has been found, unregistering the plugin first before reregistering", "plugin", registeredPlugin)
unregisterPlugin = true
break
}
@ -130,17 +130,17 @@ func (rc *reconciler) reconcile() {
}
if unregisterPlugin {
klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("Starting operationExecutor.UnregisterPlugin", ""))
klog.V(5).InfoS("Starting operationExecutor.UnregisterPlugin", "plugin", registeredPlugin)
err := rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(registeredPlugin.GenerateErrorDetailed("operationExecutor.UnregisterPlugin failed", err).Error())
klog.ErrorS(err, "OperationExecutor.UnregisterPlugin failed", "plugin", registeredPlugin)
}
if err == nil {
klog.V(1).Infof(registeredPlugin.GenerateMsgDetailed("operationExecutor.UnregisterPlugin started", ""))
klog.V(1).InfoS("OperationExecutor.UnregisterPlugin started", "plugin", registeredPlugin)
}
}
}
@ -148,16 +148,16 @@ func (rc *reconciler) reconcile() {
// Ensure plugins that should be registered are registered
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
klog.V(5).Infof(pluginToRegister.GenerateMsgDetailed("Starting operationExecutor.RegisterPlugin", ""))
klog.V(5).InfoS("Starting operationExecutor.RegisterPlugin", "plugin", pluginToRegister)
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
klog.Errorf(pluginToRegister.GenerateErrorDetailed("operationExecutor.RegisterPlugin failed", err).Error())
klog.ErrorS(err, "OperationExecutor.RegisterPlugin failed", "plugin", pluginToRegister)
}
if err == nil {
klog.V(1).Infof(pluginToRegister.GenerateMsgDetailed("operationExecutor.RegisterPlugin started", ""))
klog.V(1).InfoS("OperationExecutor.RegisterPlugin started", "plugin", pluginToRegister)
}
}
}