diff --git a/pkg/kubelet/cm/deviceplugin/BUILD b/pkg/kubelet/cm/deviceplugin/BUILD index 9c91c2e9df2..8474da6fac7 100644 --- a/pkg/kubelet/cm/deviceplugin/BUILD +++ b/pkg/kubelet/cm/deviceplugin/BUILD @@ -24,6 +24,8 @@ go_library( "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/metrics:go_default_library", + "//pkg/kubelet/util/store:go_default_library", + "//pkg/util/filesystem:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/golang.org/x/net/context:go_default_library", @@ -58,6 +60,8 @@ go_test( deps = [ "//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", + "//pkg/kubelet/util/store:go_default_library", + "//pkg/util/filesystem:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", diff --git a/pkg/kubelet/cm/deviceplugin/manager.go b/pkg/kubelet/cm/deviceplugin/manager.go index db28e36d5d6..dea81c1f1b7 100644 --- a/pkg/kubelet/cm/deviceplugin/manager.go +++ b/pkg/kubelet/cm/deviceplugin/manager.go @@ -19,7 +19,6 @@ package deviceplugin import ( "encoding/json" "fmt" - "io/ioutil" "net" "os" "path/filepath" @@ -38,6 +37,8 @@ import ( "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/metrics" + utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" + utilfs "k8s.io/kubernetes/pkg/util/filesystem" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) @@ -80,6 +81,7 @@ type ManagerImpl struct { // podDevices contains pod to allocated device mapping. podDevices podDevices + store utilstore.Store } type sourcesReadyStub struct{} @@ -114,6 +116,11 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) { // Before that, initializes them to perform no-op operations. manager.activePods = func() []*v1.Pod { return []*v1.Pod{} } manager.sourcesReady = &sourcesReadyStub{} + var err error + manager.store, err = utilstore.NewFileStore(dir, utilfs.DefaultFs{}) + if err != nil { + return nil, fmt.Errorf("failed to initialize device plugin checkpointing store: %+v", err) + } return manager, nil } @@ -418,22 +425,27 @@ func (m *ManagerImpl) writeCheckpoint() error { if err != nil { return err } - filepath := m.checkpointFile() - return ioutil.WriteFile(filepath, dataJSON, 0644) + err = m.store.Write(kubeletDevicePluginCheckpoint, dataJSON) + if err != nil { + return fmt.Errorf("failed to write deviceplugin checkpoint file %q: %v", kubeletDevicePluginCheckpoint, err) + } + return nil } // Reads device to container allocation information from disk, and populates // m.allocatedDevices accordingly. func (m *ManagerImpl) readCheckpoint() error { - filepath := m.checkpointFile() - content, err := ioutil.ReadFile(filepath) - if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err) + content, err := m.store.Read(kubeletDevicePluginCheckpoint) + if err != nil { + if err == utilstore.ErrKeyNotFound { + return nil + } + return fmt.Errorf("failed to read checkpoint file %q: %v", kubeletDevicePluginCheckpoint, err) } - glog.V(2).Infof("Read checkpoint file %s\n", filepath) + glog.V(4).Infof("Read checkpoint file %s\n", kubeletDevicePluginCheckpoint) var data checkpointData if err := json.Unmarshal(content, &data); err != nil { - return fmt.Errorf("failed to unmarshal checkpoint data: %v", err) + return fmt.Errorf("failed to unmarshal deviceplugin checkpoint data: %v", err) } m.mutex.Lock() diff --git a/pkg/kubelet/cm/deviceplugin/manager_test.go b/pkg/kubelet/cm/deviceplugin/manager_test.go index 7cd9a1aa9f1..71d7324e46e 100644 --- a/pkg/kubelet/cm/deviceplugin/manager_test.go +++ b/pkg/kubelet/cm/deviceplugin/manager_test.go @@ -33,6 +33,8 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" "k8s.io/kubernetes/pkg/kubelet/lifecycle" + utilstore "k8s.io/kubernetes/pkg/kubelet/util/store" + utilfs "k8s.io/kubernetes/pkg/util/filesystem" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) @@ -265,6 +267,7 @@ func TestCheckpoint(t *testing.T) { allocatedDevices: make(map[string]sets.String), podDevices: make(podDevices), } + testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{}) testManager.podDevices.insert("pod1", "con1", resourceName1, constructDevices([]string{"dev1", "dev2"}), @@ -394,6 +397,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) { activePods: podsStub.getActivePods, sourcesReady: &sourcesReadyStub{}, } + testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{}) testManager.allDevices[resourceName1] = sets.NewString() testManager.allDevices[resourceName1].Insert(devID1) @@ -621,6 +625,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) { allocatedDevices: make(map[string]sets.String), podDevices: make(podDevices), } + testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{}) // require one of resource1 and one of resource2 testManager.allocatedDevices[resourceName1] = sets.NewString() testManager.allocatedDevices[resourceName1].Insert(devID1)