mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #99976 from knabben/sl-devicemanager
Migrate pkg/kubelet/cm/devicemanager to structured logging
This commit is contained in:
commit
51ea8de487
@ -139,8 +139,7 @@ func (m *Stub) Start() error {
|
||||
return lastDialErr
|
||||
}
|
||||
|
||||
klog.Infof("Starting to serve on %v", m.socket)
|
||||
|
||||
klog.InfoS("Starting to serve on socket", "socket", m.socket)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -161,7 +160,7 @@ func (m *Stub) Stop() error {
|
||||
|
||||
// GetInfo is the RPC which return pluginInfo
|
||||
func (m *Stub) GetInfo(ctx context.Context, req *watcherapi.InfoRequest) (*watcherapi.PluginInfo, error) {
|
||||
klog.Info("GetInfo")
|
||||
klog.InfoS("GetInfo")
|
||||
return &watcherapi.PluginInfo{
|
||||
Type: watcherapi.DevicePlugin,
|
||||
Name: m.resourceName,
|
||||
@ -175,7 +174,7 @@ func (m *Stub) NotifyRegistrationStatus(ctx context.Context, status *watcherapi.
|
||||
m.registrationStatus <- *status
|
||||
}
|
||||
if !status.PluginRegistered {
|
||||
klog.Infof("Registration failed: %v", status.Error)
|
||||
klog.InfoS("Registration failed", "err", status.Error)
|
||||
}
|
||||
return &watcherapi.RegistrationStatusResponse{}, nil
|
||||
}
|
||||
@ -184,7 +183,7 @@ func (m *Stub) NotifyRegistrationStatus(ctx context.Context, status *watcherapi.
|
||||
func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir string) error {
|
||||
if pluginSockDir != "" {
|
||||
if _, err := os.Stat(pluginSockDir + "DEPRECATION"); err == nil {
|
||||
klog.Info("Deprecation file found. Skip registration.")
|
||||
klog.InfoS("Deprecation file found. Skip registration")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -229,13 +228,13 @@ func (m *Stub) GetDevicePluginOptions(ctx context.Context, e *pluginapi.Empty) (
|
||||
|
||||
// PreStartContainer resets the devices received
|
||||
func (m *Stub) PreStartContainer(ctx context.Context, r *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
|
||||
klog.Infof("PreStartContainer, %+v", r)
|
||||
klog.InfoS("PreStartContainer", "request", r)
|
||||
return &pluginapi.PreStartContainerResponse{}, nil
|
||||
}
|
||||
|
||||
// ListAndWatch lists devices and update that list according to the Update call
|
||||
func (m *Stub) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
|
||||
klog.Info("ListAndWatch")
|
||||
klog.InfoS("ListAndWatch")
|
||||
|
||||
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
|
||||
|
||||
@ -256,7 +255,7 @@ func (m *Stub) Update(devs []*pluginapi.Device) {
|
||||
|
||||
// GetPreferredAllocation gets the preferred allocation from a set of available devices
|
||||
func (m *Stub) GetPreferredAllocation(ctx context.Context, r *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
|
||||
klog.Infof("GetPreferredAllocation, %+v", r)
|
||||
klog.InfoS("GetPreferredAllocation", "request", r)
|
||||
|
||||
devs := make(map[string]pluginapi.Device)
|
||||
|
||||
@ -269,7 +268,7 @@ func (m *Stub) GetPreferredAllocation(ctx context.Context, r *pluginapi.Preferre
|
||||
|
||||
// Allocate does a mock allocation
|
||||
func (m *Stub) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
|
||||
klog.Infof("Allocate, %+v", r)
|
||||
klog.InfoS("Allocate", "request", r)
|
||||
|
||||
devs := make(map[string]pluginapi.Device)
|
||||
|
||||
|
@ -60,7 +60,7 @@ type endpointImpl struct {
|
||||
func newEndpointImpl(socketPath, resourceName string, callback monitorCallback) (*endpointImpl, error) {
|
||||
client, c, err := dial(socketPath)
|
||||
if err != nil {
|
||||
klog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
|
||||
klog.ErrorS(err, "Can't create new endpoint with socket path", "path", socketPath)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -96,7 +96,7 @@ func (e *endpointImpl) callback(resourceName string, devices []pluginapi.Device)
|
||||
func (e *endpointImpl) run() {
|
||||
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
|
||||
if err != nil {
|
||||
klog.Errorf(errListAndWatch, e.resourceName, err)
|
||||
klog.ErrorS(err, "listAndWatch ended unexpectedly for device plugin", "resourceName", e.resourceName)
|
||||
|
||||
return
|
||||
}
|
||||
@ -104,12 +104,12 @@ func (e *endpointImpl) run() {
|
||||
for {
|
||||
response, err := stream.Recv()
|
||||
if err != nil {
|
||||
klog.Errorf(errListAndWatch, e.resourceName, err)
|
||||
klog.ErrorS(err, "listAndWatch ended unexpectedly for device plugin", "resourceName", e.resourceName)
|
||||
return
|
||||
}
|
||||
|
||||
devs := response.Devices
|
||||
klog.V(2).Infof("State pushed for device plugin %s", e.resourceName)
|
||||
klog.V(2).InfoS("State pushed for device plugin", "resourceName", e.resourceName)
|
||||
|
||||
var newDevs []pluginapi.Device
|
||||
for _, d := range devs {
|
||||
|
@ -47,7 +47,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/util/selinux"
|
||||
)
|
||||
@ -134,7 +133,7 @@ func NewManagerImpl(topology []cadvisorapi.Node, topologyAffinityStore topologym
|
||||
}
|
||||
|
||||
func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
|
||||
klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
|
||||
klog.V(2).InfoS("Creating Device Plugin manager", "path", socketPath)
|
||||
|
||||
if socketPath == "" || !filepath.IsAbs(socketPath) {
|
||||
return nil, fmt.Errorf(errBadSocket+" %s", socketPath)
|
||||
@ -190,7 +189,7 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
if err := m.writeCheckpoint(); err != nil {
|
||||
klog.Errorf("writing checkpoint encountered %v", err)
|
||||
klog.ErrorS(err, "Writing checkpoint encountered")
|
||||
}
|
||||
}
|
||||
|
||||
@ -215,7 +214,7 @@ func (m *ManagerImpl) removeContents(dir string) error {
|
||||
// its a socket, on windows.
|
||||
stat, err := os.Lstat(filePath)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to stat file %s: %v", filePath, err)
|
||||
klog.ErrorS(err, "Failed to stat file", "path", filePath)
|
||||
continue
|
||||
}
|
||||
if stat.IsDir() {
|
||||
@ -224,7 +223,7 @@ func (m *ManagerImpl) removeContents(dir string) error {
|
||||
err = os.RemoveAll(filePath)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
klog.Errorf("Failed to remove file %s: %v", filePath, err)
|
||||
klog.ErrorS(err, "Failed to remove file", "path", filePath)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@ -240,7 +239,7 @@ func (m *ManagerImpl) checkpointFile() string {
|
||||
// podDevices and allocatedDevices information from checkpointed state and
|
||||
// starts device plugin registration service.
|
||||
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
|
||||
klog.V(2).Infof("Starting Device Plugin manager")
|
||||
klog.V(2).InfoS("Starting Device Plugin manager")
|
||||
|
||||
m.activePods = activePods
|
||||
m.sourcesReady = sourcesReady
|
||||
@ -248,7 +247,7 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc
|
||||
// Loads in allocatedDevices information from disk.
|
||||
err := m.readCheckpoint()
|
||||
if err != nil {
|
||||
klog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
|
||||
klog.InfoS("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date", "err", err)
|
||||
}
|
||||
|
||||
socketPath := filepath.Join(m.socketdir, m.socketname)
|
||||
@ -257,19 +256,19 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc
|
||||
}
|
||||
if selinux.SELinuxEnabled() {
|
||||
if err := selinux.SetFileLabel(m.socketdir, config.KubeletPluginsDirSELinuxLabel); err != nil {
|
||||
klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", m.socketdir, err)
|
||||
klog.InfoS("Unprivileged containerized plugins might not work. Could not set selinux context on socket dir", "path", m.socketdir, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Removes all stale sockets in m.socketdir. Device plugins can monitor
|
||||
// this and use it as a signal to re-register with the new Kubelet.
|
||||
if err := m.removeContents(m.socketdir); err != nil {
|
||||
klog.Errorf("Fail to clean up stale contents under %s: %v", m.socketdir, err)
|
||||
klog.ErrorS(err, "Fail to clean up stale content under socket dir", "path", m.socketdir)
|
||||
}
|
||||
|
||||
s, err := net.Listen("unix", socketPath)
|
||||
if err != nil {
|
||||
klog.Errorf(errListenSocket+" %v", err)
|
||||
klog.ErrorS(err, "Failed to listen to socket while starting device plugin registry")
|
||||
return err
|
||||
}
|
||||
|
||||
@ -282,7 +281,7 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc
|
||||
m.server.Serve(s)
|
||||
}()
|
||||
|
||||
klog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
|
||||
klog.V(2).InfoS("Serving device plugin registration server on socket", "path", socketPath)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -290,10 +289,10 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc
|
||||
// GetWatcherHandler returns the plugin handler
|
||||
func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
|
||||
if f, err := os.Create(m.socketdir + "DEPRECATION"); err != nil {
|
||||
klog.Errorf("Failed to create deprecation file at %s", m.socketdir)
|
||||
klog.ErrorS(err, "Failed to create deprecation file at socket dir", "path", m.socketdir)
|
||||
} else {
|
||||
f.Close()
|
||||
klog.V(4).Infof("created deprecation file %s", f.Name())
|
||||
klog.V(4).InfoS("Created deprecation file", "path", f.Name())
|
||||
}
|
||||
|
||||
return cache.PluginHandler(m)
|
||||
@ -301,7 +300,7 @@ func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
|
||||
|
||||
// ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource
|
||||
func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
|
||||
klog.V(2).Infof("Got Plugin %s at endpoint %s with versions %v", pluginName, endpoint, versions)
|
||||
klog.V(2).InfoS("Got Plugin at endpoint with versions", "plugin", pluginName, "endpoint", endpoint, "versions", versions)
|
||||
|
||||
if !m.isVersionCompatibleWithPlugin(versions) {
|
||||
return fmt.Errorf("manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions)
|
||||
@ -318,7 +317,7 @@ func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, version
|
||||
// TODO: Start the endpoint and wait for the First ListAndWatch call
|
||||
// before registering the plugin
|
||||
func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
|
||||
klog.V(2).Infof("Registering Plugin %s at endpoint %s", pluginName, endpoint)
|
||||
klog.V(2).InfoS("Registering plugin at endpoint", "plugin", pluginName, "endpoint", endpoint)
|
||||
|
||||
e, err := newEndpointImpl(endpoint, pluginName, m.callback)
|
||||
if err != nil {
|
||||
@ -412,7 +411,7 @@ func (m *ManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, a
|
||||
|
||||
// Register registers a device plugin.
|
||||
func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
|
||||
klog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)
|
||||
klog.InfoS("Got registration request from device plugin with resource", "resourceName", r.ResourceName)
|
||||
metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
|
||||
var versionCompatible bool
|
||||
for _, v := range pluginapi.SupportedVersions {
|
||||
@ -422,15 +421,15 @@ func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest
|
||||
}
|
||||
}
|
||||
if !versionCompatible {
|
||||
errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
|
||||
klog.Infof("Bad registration request from device plugin with resource name %q: %s", r.ResourceName, errorString)
|
||||
return &pluginapi.Empty{}, fmt.Errorf(errorString)
|
||||
err := fmt.Errorf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
|
||||
klog.InfoS("Bad registration request from device plugin with resource", "resourceName", r.ResourceName, "err", err)
|
||||
return &pluginapi.Empty{}, err
|
||||
}
|
||||
|
||||
if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
|
||||
errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
|
||||
klog.Infof("Bad registration request from device plugin: %s", errorString)
|
||||
return &pluginapi.Empty{}, fmt.Errorf(errorString)
|
||||
err := fmt.Errorf(errInvalidResourceName, r.ResourceName)
|
||||
klog.InfoS("Bad registration request from device plugin", "err", err)
|
||||
return &pluginapi.Empty{}, err
|
||||
}
|
||||
|
||||
// TODO: for now, always accepts newest device plugin. Later may consider to
|
||||
@ -466,7 +465,7 @@ func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.D
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
m.endpoints[resourceName] = endpointInfo{e: e, opts: options}
|
||||
klog.V(2).Infof("Registered endpoint %v", e)
|
||||
klog.V(2).InfoS("Registered endpoint", "endpoint", e)
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) {
|
||||
@ -480,13 +479,13 @@ func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) {
|
||||
m.markResourceUnhealthy(resourceName)
|
||||
}
|
||||
|
||||
klog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e)
|
||||
klog.V(2).InfoS("Endpoint became unhealthy", "resourceName", resourceName, "endpoint", e)
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
||||
new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
|
||||
klog.ErrorS(err, "Failed to dial device plugin with request", "request", r)
|
||||
return
|
||||
}
|
||||
m.registerEndpoint(r.ResourceName, r.Options, new)
|
||||
@ -496,7 +495,7 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) markResourceUnhealthy(resourceName string) {
|
||||
klog.V(2).Infof("Mark all resources Unhealthy for resource %s", resourceName)
|
||||
klog.V(2).InfoS("Mark all resources Unhealthy for resource", "resourceName", resourceName)
|
||||
healthyDevices := sets.NewString()
|
||||
if _, ok := m.healthyDevices[resourceName]; ok {
|
||||
healthyDevices = m.healthyDevices[resourceName]
|
||||
@ -533,7 +532,7 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
|
||||
// should always be consistent. Otherwise, we run with the risk
|
||||
// of failing to garbage collect non-existing resources or devices.
|
||||
if !ok {
|
||||
klog.Errorf("unexpected: healthyDevices and endpoints are out of sync")
|
||||
klog.ErrorS(nil, "Unexpected: healthyDevices and endpoints are out of sync")
|
||||
}
|
||||
delete(m.endpoints, resourceName)
|
||||
delete(m.healthyDevices, resourceName)
|
||||
@ -548,7 +547,7 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
|
||||
eI, ok := m.endpoints[resourceName]
|
||||
if (ok && eI.e.stopGracePeriodExpired()) || !ok {
|
||||
if !ok {
|
||||
klog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync")
|
||||
klog.ErrorS(nil, "Unexpected: unhealthyDevices and endpoints are out of sync")
|
||||
}
|
||||
delete(m.endpoints, resourceName)
|
||||
delete(m.unhealthyDevices, resourceName)
|
||||
@ -564,7 +563,7 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
|
||||
m.mutex.Unlock()
|
||||
if needsUpdateCheckpoint {
|
||||
if err := m.writeCheckpoint(); err != nil {
|
||||
klog.Errorf("writing checkpoint encountered %v", err)
|
||||
klog.ErrorS(err, "Error on writing checkpoint")
|
||||
}
|
||||
}
|
||||
return capacity, allocatable, deletedResources.UnsortedList()
|
||||
@ -598,7 +597,7 @@ func (m *ManagerImpl) readCheckpoint() error {
|
||||
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
|
||||
if err != nil {
|
||||
if err == errors.ErrCheckpointNotFound {
|
||||
klog.Warningf("Failed to retrieve checkpoint for %q: %v", kubeletDeviceManagerCheckpoint, err)
|
||||
klog.InfoS("Failed to retrieve checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint, "err", err)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
@ -633,7 +632,7 @@ func (m *ManagerImpl) UpdateAllocatedDevices() {
|
||||
if len(podsToBeRemoved) <= 0 {
|
||||
return
|
||||
}
|
||||
klog.V(3).Infof("pods to be removed: %v", podsToBeRemoved.List())
|
||||
klog.V(3).InfoS("Pods to be removed", "podUIDs", podsToBeRemoved.List())
|
||||
m.podDevices.delete(podsToBeRemoved.List())
|
||||
// Regenerated allocatedDevices after we update pod allocation information.
|
||||
m.allocatedDevices = m.podDevices.devices()
|
||||
@ -649,7 +648,7 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
|
||||
// This can happen if a container restarts for example.
|
||||
devices := m.podDevices.containerDevices(podUID, contName, resource)
|
||||
if devices != nil {
|
||||
klog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, string(podUID), devices.List())
|
||||
klog.V(3).InfoS("Found pre-allocated devices for resource on pod", "resourceName", resource, "containerName", contName, "podUID", string(podUID), "devices", devices.List())
|
||||
needed = needed - devices.Len()
|
||||
// A pod's resource is not expected to change once admitted by the API server,
|
||||
// so just fail loudly here. We can revisit this part if this no longer holds.
|
||||
@ -661,7 +660,7 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
|
||||
// No change, no work.
|
||||
return nil, nil
|
||||
}
|
||||
klog.V(3).Infof("Needs to allocate %d %q for pod %q container %q", needed, resource, string(podUID), contName)
|
||||
klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
|
||||
// Check if resource registered with devicemanager
|
||||
if _, ok := m.healthyDevices[resource]; !ok {
|
||||
return nil, fmt.Errorf("can't allocate unregistered device %s", resource)
|
||||
@ -846,7 +845,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
|
||||
for k, v := range container.Resources.Limits {
|
||||
resource := string(k)
|
||||
needed := int(v.Value())
|
||||
klog.V(3).Infof("needs %d %s", needed, resource)
|
||||
klog.V(3).InfoS("Looking for needed resources", "needed", needed, "resourceName", resource)
|
||||
if !m.isDevicePluginResource(resource) {
|
||||
continue
|
||||
}
|
||||
@ -892,7 +891,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
|
||||
devs := allocDevices.UnsortedList()
|
||||
// TODO: refactor this part of code to just append a ContainerAllocationRequest
|
||||
// in a passed in AllocateRequest pointer, and issues a single Allocate call per pod.
|
||||
klog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource)
|
||||
klog.V(3).InfoS("Making allocation request for device plugin", "devices", devs, "resourceName", resource)
|
||||
resp, err := eI.e.allocate(devs)
|
||||
metrics.DevicePluginAllocationDuration.WithLabelValues(resource).Observe(metrics.SinceInSeconds(startRPCTime))
|
||||
if err != nil {
|
||||
@ -956,7 +955,7 @@ func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co
|
||||
}
|
||||
}
|
||||
if needsReAllocate {
|
||||
klog.V(2).Infof("needs re-allocate device plugin resources for pod %s, container %s", format.Pod(pod), container.Name)
|
||||
klog.V(2).InfoS("Needs to re-allocate device plugin resources for pod", "pod", klog.KObj(pod), "containerName", container.Name)
|
||||
if err := m.Allocate(pod, container); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -976,7 +975,7 @@ func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource s
|
||||
|
||||
if eI.opts == nil || !eI.opts.PreStartRequired {
|
||||
m.mutex.Unlock()
|
||||
klog.V(4).Infof("Plugin options indicate to skip PreStartContainer for resource: %s", resource)
|
||||
klog.V(4).InfoS("Plugin options indicate to skip PreStartContainer for resource", "resourceName", resource)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -988,7 +987,7 @@ func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource s
|
||||
|
||||
m.mutex.Unlock()
|
||||
devs := devices.UnsortedList()
|
||||
klog.V(4).Infof("Issuing an PreStartContainer call for container, %s, of pod %s", contName, string(podUID))
|
||||
klog.V(4).InfoS("Issuing a PreStartContainer call for container", "containerName", contName, "podUID", string(podUID))
|
||||
_, err := eI.e.preStartContainer(devs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("device plugin PreStartContainer rpc failed with err: %v", err)
|
||||
@ -1006,12 +1005,12 @@ func (m *ManagerImpl) callGetPreferredAllocationIfAvailable(podUID, contName, re
|
||||
}
|
||||
|
||||
if eI.opts == nil || !eI.opts.GetPreferredAllocationAvailable {
|
||||
klog.V(4).Infof("Plugin options indicate to skip GetPreferredAllocation for resource: %s", resource)
|
||||
klog.V(4).InfoS("Plugin options indicate to skip GetPreferredAllocation for resource", "resourceName", resource)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
m.mutex.Unlock()
|
||||
klog.V(4).Infof("Issuing a GetPreferredAllocation call for container, %s, of pod %s", contName, string(podUID))
|
||||
klog.V(4).InfoS("Issuing a GetPreferredAllocation call for container", "containerName", contName, "podUID", string(podUID))
|
||||
resp, err := eI.e.getPreferredAllocation(available.UnsortedList(), mustInclude.UnsortedList(), size)
|
||||
m.mutex.Lock()
|
||||
if err != nil {
|
||||
|
@ -186,13 +186,13 @@ func (pdev *podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
|
||||
for conName, resources := range containerDevices {
|
||||
for resource, devices := range resources {
|
||||
if devices.allocResp == nil {
|
||||
klog.Errorf("Can't marshal allocResp for %v %v %v: allocation response is missing", podUID, conName, resource)
|
||||
klog.ErrorS(nil, "Can't marshal allocResp, allocation response is missing", "podUID", podUID, "containerName", conName, "resourceName", resource)
|
||||
continue
|
||||
}
|
||||
|
||||
allocResp, err := devices.allocResp.Marshal()
|
||||
if err != nil {
|
||||
klog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err)
|
||||
klog.ErrorS(err, "Can't marshal allocResp", "podUID", podUID, "containerName", conName, "resourceName", resource)
|
||||
continue
|
||||
}
|
||||
data = append(data, checkpoint.PodDevicesEntry{
|
||||
@ -210,12 +210,13 @@ func (pdev *podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
|
||||
// Populates podDevices from the passed in checkpointData.
|
||||
func (pdev *podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) {
|
||||
for _, entry := range data {
|
||||
klog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n",
|
||||
entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp)
|
||||
klog.V(2).InfoS("Get checkpoint entry",
|
||||
"podUID", entry.PodUID, "containerName", entry.ContainerName,
|
||||
"resourceName", entry.ResourceName, "deviceIDs", entry.DeviceIDs, "allocated", entry.AllocResp)
|
||||
allocResp := &pluginapi.ContainerAllocateResponse{}
|
||||
err := allocResp.Unmarshal(entry.AllocResp)
|
||||
if err != nil {
|
||||
klog.Errorf("Can't unmarshal allocResp for %v %v %v: %v", entry.PodUID, entry.ContainerName, entry.ResourceName, err)
|
||||
klog.ErrorS(err, "Can't unmarshal allocResp", "podUID", entry.PodUID, "containerName", entry.ContainerName, "resourceName", entry.ResourceName)
|
||||
continue
|
||||
}
|
||||
pdev.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, allocResp)
|
||||
@ -253,13 +254,13 @@ func (pdev *podDevices) deviceRunContainerOptions(podUID, contName string) *Devi
|
||||
// Updates RunContainerOptions.Envs.
|
||||
for k, v := range resp.Envs {
|
||||
if e, ok := envsMap[k]; ok {
|
||||
klog.V(4).Infof("Skip existing env %s %s", k, v)
|
||||
klog.V(4).InfoS("Skip existing env", "envKey", k, "envValue", v)
|
||||
if e != v {
|
||||
klog.Errorf("Environment variable %s has conflicting setting: %s and %s", k, e, v)
|
||||
klog.ErrorS(nil, "Environment variable has conflicting setting", "envKey", k, "expected", v, "got", e)
|
||||
}
|
||||
continue
|
||||
}
|
||||
klog.V(4).Infof("Add env %s %s", k, v)
|
||||
klog.V(4).InfoS("Add env", "envKey", k, "envValue", v)
|
||||
envsMap[k] = v
|
||||
opts.Envs = append(opts.Envs, kubecontainer.EnvVar{Name: k, Value: v})
|
||||
}
|
||||
@ -267,14 +268,14 @@ func (pdev *podDevices) deviceRunContainerOptions(podUID, contName string) *Devi
|
||||
// Updates RunContainerOptions.Devices.
|
||||
for _, dev := range resp.Devices {
|
||||
if d, ok := devsMap[dev.ContainerPath]; ok {
|
||||
klog.V(4).Infof("Skip existing device %s %s", dev.ContainerPath, dev.HostPath)
|
||||
klog.V(4).InfoS("Skip existing device", "containerPath", dev.ContainerPath, "hostPath", dev.HostPath)
|
||||
if d != dev.HostPath {
|
||||
klog.Errorf("Container device %s has conflicting mapping host devices: %s and %s",
|
||||
dev.ContainerPath, d, dev.HostPath)
|
||||
klog.ErrorS(nil, "Container device has conflicting mapping host devices",
|
||||
"containerPath", dev.ContainerPath, "got", d, "expected", dev.HostPath)
|
||||
}
|
||||
continue
|
||||
}
|
||||
klog.V(4).Infof("Add device %s %s", dev.ContainerPath, dev.HostPath)
|
||||
klog.V(4).InfoS("Add device", "containerPath", dev.ContainerPath, "hostPath", dev.HostPath)
|
||||
devsMap[dev.ContainerPath] = dev.HostPath
|
||||
opts.Devices = append(opts.Devices, kubecontainer.DeviceInfo{
|
||||
PathOnHost: dev.HostPath,
|
||||
@ -286,14 +287,14 @@ func (pdev *podDevices) deviceRunContainerOptions(podUID, contName string) *Devi
|
||||
// Updates RunContainerOptions.Mounts.
|
||||
for _, mount := range resp.Mounts {
|
||||
if m, ok := mountsMap[mount.ContainerPath]; ok {
|
||||
klog.V(4).Infof("Skip existing mount %s %s", mount.ContainerPath, mount.HostPath)
|
||||
klog.V(4).InfoS("Skip existing mount", "containerPath", mount.ContainerPath, "hostPath", mount.HostPath)
|
||||
if m != mount.HostPath {
|
||||
klog.Errorf("Container mount %s has conflicting mapping host mounts: %s and %s",
|
||||
mount.ContainerPath, m, mount.HostPath)
|
||||
klog.ErrorS(nil, "Container mount has conflicting mapping host mounts",
|
||||
"containerPath", mount.ContainerPath, "conflictingPath", m, "hostPath", mount.HostPath)
|
||||
}
|
||||
continue
|
||||
}
|
||||
klog.V(4).Infof("Add mount %s %s", mount.ContainerPath, mount.HostPath)
|
||||
klog.V(4).InfoS("Add mount", "containerPath", mount.ContainerPath, "hostPath", mount.HostPath)
|
||||
mountsMap[mount.ContainerPath] = mount.HostPath
|
||||
opts.Mounts = append(opts.Mounts, kubecontainer.Mount{
|
||||
Name: mount.ContainerPath,
|
||||
@ -308,13 +309,13 @@ func (pdev *podDevices) deviceRunContainerOptions(podUID, contName string) *Devi
|
||||
// Updates for Annotations
|
||||
for k, v := range resp.Annotations {
|
||||
if e, ok := annotationsMap[k]; ok {
|
||||
klog.V(4).Infof("Skip existing annotation %s %s", k, v)
|
||||
klog.V(4).InfoS("Skip existing annotation", "annotationKey", k, "annotationValue", v)
|
||||
if e != v {
|
||||
klog.Errorf("Annotation %s has conflicting setting: %s and %s", k, e, v)
|
||||
klog.ErrorS(nil, "Annotation has conflicting setting", "annotationKey", k, "expected", e, "got", v)
|
||||
}
|
||||
continue
|
||||
}
|
||||
klog.V(4).Infof("Add annotation %s %s", k, v)
|
||||
klog.V(4).InfoS("Add annotation", "annotationKey", k, "annotationValue", v)
|
||||
annotationsMap[k] = v
|
||||
opts.Annotations = append(opts.Annotations, kubecontainer.Annotation{Name: k, Value: v})
|
||||
}
|
||||
|
@ -108,10 +108,6 @@ const (
|
||||
errEndpointStopped = "endpoint %v has been stopped"
|
||||
// errBadSocket is the error raised when the registry socket path is not absolute
|
||||
errBadSocket = "bad socketPath, must be an absolute path:"
|
||||
// errListenSocket is the error raised when the registry could not listen on the socket
|
||||
errListenSocket = "failed to listen to socket while starting device plugin registry, with error"
|
||||
// errListAndWatch is the error raised when ListAndWatch ended unsuccessfully
|
||||
errListAndWatch = "listAndWatch ended unexpectedly for device plugin %s with error %v"
|
||||
)
|
||||
|
||||
// endpointStopGracePeriod indicates the grace period after an endpoint is stopped
|
||||
|
Loading…
Reference in New Issue
Block a user