diff --git a/pkg/kubelet/cm/dra/claiminfo.go b/pkg/kubelet/cm/dra/claiminfo.go index 9048c9bffbc..0fb971afd22 100644 --- a/pkg/kubelet/cm/dra/claiminfo.go +++ b/pkg/kubelet/cm/dra/claiminfo.go @@ -115,12 +115,12 @@ func (info *ClaimInfo) isPrepared() bool { func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) { checkpointer, err := state.NewCheckpointer(stateDir, checkpointName) if err != nil { - return nil, fmt.Errorf("could not initialize checkpoint manager, please drain node and remove dra state file, err: %+v", err) + return nil, fmt.Errorf("could not initialize checkpoint manager, please drain node and remove dra state file, err: %w", err) } checkpoint, err := checkpointer.GetOrCreate() if err != nil { - return nil, fmt.Errorf("error calling GetOrCreate() on checkpoint state: %v", err) + return nil, fmt.Errorf("error calling GetOrCreate() on checkpoint state: %w", err) } cache := &claimInfoCache{ @@ -182,9 +182,9 @@ func (cache *claimInfoCache) delete(claimName, namespace string) { // that is referenced by the pod with the given UID // This function is used indirectly by the status manager // to check if pod can enter termination status -func (cache *claimInfoCache) hasPodReference(UID types.UID) bool { +func (cache *claimInfoCache) hasPodReference(uid types.UID) bool { for _, claimInfo := range cache.claimInfo { - if claimInfo.hasPodReference(UID) { + if claimInfo.hasPodReference(uid) { return true } } diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index ab8ad8c7e5d..e26e2f7b259 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -70,7 +70,7 @@ type ManagerImpl struct { func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, nodeName types.NodeName) (*ManagerImpl, error) { claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName) if err != nil { - return nil, fmt.Errorf("failed to create claimInfo cache: %+v", err) + return nil, fmt.Errorf("failed to create claimInfo cache: %w", err) } // TODO: for now the reconcile period is not configurable. @@ -158,7 +158,7 @@ func (m *ManagerImpl) PrepareResources(ctx context.Context, pod *v1.Pod) error { logger.V(3).Info("Processing resource", "pod", klog.KObj(pod), "podClaim", podClaim.Name) claimName, mustCheckOwner, err := resourceclaim.Name(pod, podClaim) if err != nil { - return fmt.Errorf("prepare resource claim: %v", err) + return fmt.Errorf("prepare resource claim: %w", err) } if claimName == nil { @@ -172,7 +172,7 @@ func (m *ManagerImpl) PrepareResources(ctx context.Context, pod *v1.Pod) error { *claimName, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", *claimName, pod.Name, err) + return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %w", *claimName, pod.Name, err) } if mustCheckOwner { @@ -489,10 +489,10 @@ func (m *ManagerImpl) unprepareResources(ctx context.Context, podUID types.UID, // PodMightNeedToUnprepareResources returns true if the pod might need to // unprepare resources -func (m *ManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool { +func (m *ManagerImpl) PodMightNeedToUnprepareResources(uid types.UID) bool { m.cache.Lock() defer m.cache.Unlock() - return m.cache.hasPodReference(UID) + return m.cache.hasPodReference(uid) } // GetContainerClaimInfos gets Container's ClaimInfo diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index 30ec100ade4..df101fdeb00 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/fake" "k8s.io/dynamic-resource-allocation/resourceclaim" + "k8s.io/klog/v2" drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha4" "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin" "k8s.io/kubernetes/pkg/kubelet/cm/dra/state" @@ -118,7 +119,7 @@ type fakeDRAServerInfo struct { teardownFn tearDown } -func setupFakeDRADriverGRPCServer(shouldTimeout bool, pluginClientTimeout *time.Duration, prepareResourcesResponse *drapb.NodePrepareResourcesResponse, unprepareResourcesResponse *drapb.NodeUnprepareResourcesResponse) (fakeDRAServerInfo, error) { +func setupFakeDRADriverGRPCServer(ctx context.Context, shouldTimeout bool, pluginClientTimeout *time.Duration, prepareResourcesResponse *drapb.NodePrepareResourcesResponse, unprepareResourcesResponse *drapb.NodeUnprepareResourcesResponse) (fakeDRAServerInfo, error) { socketDir, err := os.MkdirTemp("", "dra") if err != nil { return fakeDRAServerInfo{ @@ -133,7 +134,10 @@ func setupFakeDRADriverGRPCServer(shouldTimeout bool, pluginClientTimeout *time. teardown := func() { close(stopCh) - os.RemoveAll(socketName) + if err := os.Remove(socketName); err != nil { + logger := klog.FromContext(ctx) + logger.Error(err, "failed to remove socket file", "path", socketName) + } } l, err := net.Listen("unix", socketName) @@ -159,11 +163,16 @@ func setupFakeDRADriverGRPCServer(shouldTimeout bool, pluginClientTimeout *time. drapb.RegisterNodeServer(s, fakeDRADriverGRPCServer) - go func() { - go s.Serve(l) + go func(ctx context.Context) { + go func() { + if err := s.Serve(l); err != nil { + logger := klog.FromContext(ctx) + logger.Error(err, "failed to serve gRPC") + } + }() <-stopCh s.GracefulStop() - }() + }(ctx) return fakeDRAServerInfo{ server: fakeDRADriverGRPCServer, @@ -565,7 +574,7 @@ func TestPrepareResources(t *testing.T) { pluginClientTimeout = &timeout } - draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout, pluginClientTimeout, test.resp, nil) + draServerInfo, err := setupFakeDRADriverGRPCServer(tCtx, test.wantTimeout, pluginClientTimeout, test.resp, nil) if err != nil { t.Fatal(err) } @@ -702,7 +711,7 @@ func TestUnprepareResources(t *testing.T) { pluginClientTimeout = &timeout } - draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout, pluginClientTimeout, nil, test.resp) + draServerInfo, err := setupFakeDRADriverGRPCServer(tCtx, test.wantTimeout, pluginClientTimeout, nil, test.resp) if err != nil { t.Fatal(err) } @@ -872,7 +881,7 @@ func TestParallelPrepareUnprepareResources(t *testing.T) { tCtx := ktesting.Init(t) // Setup and register fake DRA driver - draServerInfo, err := setupFakeDRADriverGRPCServer(false, nil, nil, nil) + draServerInfo, err := setupFakeDRADriverGRPCServer(tCtx, false, nil, nil, nil) if err != nil { t.Fatal(err) }