node: devicemgr: Address warnings from golint

Signed-off-by: Swati Sehgal <swsehgal@redhat.com>
This commit is contained in:
Swati Sehgal 2022-10-25 13:30:05 +01:00
parent 8b29eded52
commit 40741681a2
9 changed files with 39 additions and 16 deletions

View File

@ -74,10 +74,10 @@ func (dev DevicesPerNUMA) Devices() sets.String {
// New returns an instance of Checkpoint - must be an alias for the most recent version // New returns an instance of Checkpoint - must be an alias for the most recent version
func New(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint { func New(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint {
return NewV2(devEntries, devices) return newV2(devEntries, devices)
} }
func NewV2(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint { func newV2(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint {
return &Data{ return &Data{
Data: checkpointData{ Data: checkpointData{
PodDeviceEntries: devEntries, PodDeviceEntries: devEntries,

View File

@ -28,7 +28,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
) )
// PodDevicesEntry connects pod information to devices, without topology information (k8s <= 1.19) // PodDevicesEntryV1 connects pod information to devices, without topology information (k8s <= 1.19)
type PodDevicesEntryV1 struct { type PodDevicesEntryV1 struct {
PodUID string PodUID string
ContainerName string ContainerName string
@ -37,7 +37,7 @@ type PodDevicesEntryV1 struct {
AllocResp []byte AllocResp []byte
} }
// checkpointData struct is used to store pod to device allocation information // checkpointDataV1 struct is used to store pod to device allocation information
// in a checkpoint file, without topology information (k8s <= 1.19) // in a checkpoint file, without topology information (k8s <= 1.19)
type checkpointDataV1 struct { type checkpointDataV1 struct {
PodDeviceEntries []PodDevicesEntryV1 PodDeviceEntries []PodDevicesEntryV1
@ -63,13 +63,13 @@ func (cp checkpointDataV1) checksum() checksum.Checksum {
return checksum.Checksum(hash.Sum32()) return checksum.Checksum(hash.Sum32())
} }
// Data holds checkpoint data and its checksum, in V1 (k8s <= 1.19) format // DataV1 holds checkpoint data and its checksum, in V1 (k8s <= 1.19) format
type DataV1 struct { type DataV1 struct {
Data checkpointDataV1 Data checkpointDataV1
Checksum checksum.Checksum Checksum checksum.Checksum
} }
// New returns an instance of Checkpoint, in V1 (k8s <= 1.19) format. // NewV1 returns an instance of Checkpoint, in V1 (k8s <= 1.19) format.
// Users should avoid creating checkpoints in formats different than the most recent one, // Users should avoid creating checkpoints in formats different than the most recent one,
// use the old formats only to validate existing checkpoint and convert them to most recent // use the old formats only to validate existing checkpoint and convert them to most recent
// format. The only exception should be test code. // format. The only exception should be test code.
@ -90,7 +90,7 @@ func (cp *DataV1) MarshalCheckpoint() ([]byte, error) {
return json.Marshal(*cp) return json.Marshal(*cp)
} }
// MarshalCheckpoint returns marshalled data // UnmarshalCheckpoint returns unmarshalled data
func (cp *DataV1) UnmarshalCheckpoint(blob []byte) error { func (cp *DataV1) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp) return json.Unmarshal(blob, cp)
} }

View File

@ -50,7 +50,7 @@ type endpointImpl struct {
// This is to be used during normal device plugin registration. // This is to be used during normal device plugin registration.
func newEndpointImpl(p plugin.DevicePlugin) *endpointImpl { func newEndpointImpl(p plugin.DevicePlugin) *endpointImpl {
return &endpointImpl{ return &endpointImpl{
api: p.Api(), api: p.API(),
resourceName: p.Resource(), resourceName: p.Resource(),
} }
} }

View File

@ -163,6 +163,8 @@ func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffi
return manager, nil return manager, nil
} }
// CleanupPluginDirectory is to remove all existing unix sockets
// from /var/lib/kubelet/device-plugins on Device Plugin Manager start
func (m *ManagerImpl) CleanupPluginDirectory(dir string) error { func (m *ManagerImpl) CleanupPluginDirectory(dir string) error {
d, err := os.Open(dir) d, err := os.Open(dir)
if err != nil { if err != nil {
@ -200,8 +202,10 @@ func (m *ManagerImpl) CleanupPluginDirectory(dir string) error {
return errorsutil.NewAggregate(errs) return errorsutil.NewAggregate(errs)
} }
// PluginConnected is to connect a plugin to a new endpoint.
// This is done as part of device plugin registration.
func (m *ManagerImpl) PluginConnected(resourceName string, p plugin.DevicePlugin) error { func (m *ManagerImpl) PluginConnected(resourceName string, p plugin.DevicePlugin) error {
options, err := p.Api().GetDevicePluginOptions(context.Background(), &pluginapi.Empty{}) options, err := p.API().GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
if err != nil { if err != nil {
return fmt.Errorf("failed to get device plugin options: %v", err) return fmt.Errorf("failed to get device plugin options: %v", err)
} }
@ -215,6 +219,8 @@ func (m *ManagerImpl) PluginConnected(resourceName string, p plugin.DevicePlugin
return nil return nil
} }
// PluginDisconnected is to disconnect a plugin from an endpoint.
// This is done as part of device plugin deregistration.
func (m *ManagerImpl) PluginDisconnected(resourceName string) { func (m *ManagerImpl) PluginDisconnected(resourceName string) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
@ -227,6 +233,10 @@ func (m *ManagerImpl) PluginDisconnected(resourceName string) {
m.endpoints[resourceName].e.setStopTime(time.Now()) m.endpoints[resourceName].e.setStopTime(time.Now())
} }
// PluginListAndWatchReceiver receives ListAndWatchResponse from a device plugin
// and ensures that an upto date state (e.g. number of devices and device health)
// is captured. Also, registered device and device to container allocation
// information is checkpointed to the disk.
func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *pluginapi.ListAndWatchResponse) { func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *pluginapi.ListAndWatchResponse) {
var devices []pluginapi.Device var devices []pluginapi.Device
for _, d := range resp.Devices { for _, d := range resp.Devices {

View File

@ -20,10 +20,13 @@ import (
api "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" api "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
) )
// RegistrationHandler is an interface for handling device plugin registration
// and plugin directory cleanup.
type RegistrationHandler interface { type RegistrationHandler interface {
CleanupPluginDirectory(string) error CleanupPluginDirectory(string) error
} }
// ClientHandler is an interface for handling device plugin connections.
type ClientHandler interface { type ClientHandler interface {
PluginConnected(string, DevicePlugin) error PluginConnected(string, DevicePlugin) error
PluginDisconnected(string) PluginDisconnected(string)

View File

@ -30,12 +30,14 @@ import (
api "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" api "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
) )
// DevicePlugin interface provides methods for accessing Device Plugin resources, API and unix socket.
type DevicePlugin interface { type DevicePlugin interface {
Api() api.DevicePluginClient API() api.DevicePluginClient
Resource() string Resource() string
SocketPath() string SocketPath() string
} }
// Client interface provides methods for establishing/closing gRPC connection and running the device plugin gRPC client.
type Client interface { type Client interface {
Connect() error Connect() error
Run() Run()
@ -51,6 +53,7 @@ type client struct {
client api.DevicePluginClient client api.DevicePluginClient
} }
// NewPluginClient returns an initialized device plugin client.
func NewPluginClient(r string, socketPath string, h ClientHandler) Client { func NewPluginClient(r string, socketPath string, h ClientHandler) Client {
return &client{ return &client{
resource: r, resource: r,
@ -59,6 +62,7 @@ func NewPluginClient(r string, socketPath string, h ClientHandler) Client {
} }
} }
// Connect is for establishing a gRPC connection between device manager and device plugin.
func (c *client) Connect() error { func (c *client) Connect() error {
client, conn, err := dial(c.socket) client, conn, err := dial(c.socket)
if err != nil { if err != nil {
@ -70,6 +74,7 @@ func (c *client) Connect() error {
return c.handler.PluginConnected(c.resource, c) return c.handler.PluginConnected(c.resource, c)
} }
// Run is for running the device plugin gRPC client.
func (c *client) Run() { func (c *client) Run() {
stream, err := c.client.ListAndWatch(context.Background(), &api.Empty{}) stream, err := c.client.ListAndWatch(context.Background(), &api.Empty{})
if err != nil { if err != nil {
@ -88,6 +93,7 @@ func (c *client) Run() {
} }
} }
// Disconnect is for closing gRPC connection between device manager and device plugin.
func (c *client) Disconnect() error { func (c *client) Disconnect() error {
c.mutex.Lock() c.mutex.Lock()
if c.grpc != nil { if c.grpc != nil {
@ -105,7 +111,7 @@ func (c *client) Resource() string {
return c.resource return c.resource
} }
func (c *client) Api() api.DevicePluginClient { func (c *client) API() api.DevicePluginClient {
return c.client return c.client
} }

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
) )
// Server interface provides methods for Device plugin registration server.
type Server interface { type Server interface {
cache.PluginHandler cache.PluginHandler
Start() error Start() error
@ -54,6 +55,7 @@ type server struct {
clients map[string]Client clients map[string]Client
} }
// NewServer returns an initialized device plugin registration server.
func NewServer(socketPath string, rh RegistrationHandler, ch ClientHandler) (Server, error) { func NewServer(socketPath string, rh RegistrationHandler, ch ClientHandler) (Server, error) {
if socketPath == "" || !filepath.IsAbs(socketPath) { if socketPath == "" || !filepath.IsAbs(socketPath) {
return nil, fmt.Errorf(errBadSocket+" %s", socketPath) return nil, fmt.Errorf(errBadSocket+" %s", socketPath)

View File

@ -343,11 +343,11 @@ func (pdev *podDevices) getContainerDevices(podUID, contName string) ResourceDev
} }
devicePluginMap := make(map[string]pluginapi.Device) devicePluginMap := make(map[string]pluginapi.Device)
for numaid, devlist := range allocateInfo.deviceIds { for numaid, devlist := range allocateInfo.deviceIds {
for _, devId := range devlist { for _, devID := range devlist {
var topology *pluginapi.TopologyInfo var topology *pluginapi.TopologyInfo
if numaid != nodeWithoutTopology { if numaid != nodeWithoutTopology {
NUMANodes := []*pluginapi.NUMANode{{ID: numaid}} NUMANodes := []*pluginapi.NUMANode{{ID: numaid}}
if pDev, ok := devicePluginMap[devId]; ok && pDev.Topology != nil { if pDev, ok := devicePluginMap[devID]; ok && pDev.Topology != nil {
if nodes := pDev.Topology.GetNodes(); nodes != nil { if nodes := pDev.Topology.GetNodes(); nodes != nil {
NUMANodes = append(NUMANodes, nodes...) NUMANodes = append(NUMANodes, nodes...)
} }
@ -356,7 +356,7 @@ func (pdev *podDevices) getContainerDevices(podUID, contName string) ResourceDev
// ID and Healthy are not relevant here. // ID and Healthy are not relevant here.
topology = &pluginapi.TopologyInfo{Nodes: NUMANodes} topology = &pluginapi.TopologyInfo{Nodes: NUMANodes}
} }
devicePluginMap[devId] = pluginapi.Device{ devicePluginMap[devID] = pluginapi.Device{
Topology: topology, Topology: topology,
} }
} }
@ -372,10 +372,12 @@ type DeviceInstances map[string]pluginapi.Device
// ResourceDeviceInstances is a mapping resource name -> DeviceInstances // ResourceDeviceInstances is a mapping resource name -> DeviceInstances
type ResourceDeviceInstances map[string]DeviceInstances type ResourceDeviceInstances map[string]DeviceInstances
// NewResourceDeviceInstances returns a new ResourceDeviceInstances
func NewResourceDeviceInstances() ResourceDeviceInstances { func NewResourceDeviceInstances() ResourceDeviceInstances {
return make(ResourceDeviceInstances) return make(ResourceDeviceInstances)
} }
// Clone returns a clone of ResourceDeviceInstances
func (rdev ResourceDeviceInstances) Clone() ResourceDeviceInstances { func (rdev ResourceDeviceInstances) Clone() ResourceDeviceInstances {
clone := NewResourceDeviceInstances() clone := NewResourceDeviceInstances()
for resourceName, resourceDevs := range rdev { for resourceName, resourceDevs := range rdev {

View File

@ -42,14 +42,14 @@ func TestGetContainerDevices(t *testing.T) {
contDevices, ok := resContDevices[resourceName1] contDevices, ok := resContDevices[resourceName1]
require.True(t, ok, "resource %q not present", resourceName1) require.True(t, ok, "resource %q not present", resourceName1)
for devId, plugInfo := range contDevices { for devID, plugInfo := range contDevices {
nodes := plugInfo.GetTopology().GetNodes() nodes := plugInfo.GetTopology().GetNodes()
require.Equal(t, len(nodes), len(devices), "Incorrect container devices: %v - %v (nodes %v)", devices, contDevices, nodes) require.Equal(t, len(nodes), len(devices), "Incorrect container devices: %v - %v (nodes %v)", devices, contDevices, nodes)
for _, node := range plugInfo.GetTopology().GetNodes() { for _, node := range plugInfo.GetTopology().GetNodes() {
dev, ok := devices[node.ID] dev, ok := devices[node.ID]
require.True(t, ok, "NUMA id %v doesn't exist in result", node.ID) require.True(t, ok, "NUMA id %v doesn't exist in result", node.ID)
require.Equal(t, devId, dev[0], "Can't find device %s in result", dev[0]) require.Equal(t, devID, dev[0], "Can't find device %s in result", dev[0])
} }
} }
} }