Merge pull request #55382 from vikaschoudhary16/checkpoint

Automatic merge from submit-queue (batch tested with PRs 57172, 55382, 56147, 56146, 56158). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Use file store utility for device plugin checkpointing

Partially address issue #54088
cc @sjenning @jeremyeder @jiayingz @vishh 

/sig node
This commit is contained in:
Kubernetes Submit Queue 2017-12-14 12:38:13 -08:00 committed by GitHub
commit 578f3db8d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 9 deletions

View File

@ -24,6 +24,8 @@ go_library(
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/util/store:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library", "//vendor/golang.org/x/net/context:go_default_library",
@ -58,6 +60,8 @@ go_test(
deps = [ deps = [
"//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/util/store:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library",

View File

@ -19,7 +19,6 @@ package deviceplugin
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil"
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
@ -38,6 +37,8 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
) )
@ -80,6 +81,7 @@ type ManagerImpl struct {
// podDevices contains pod to allocated device mapping. // podDevices contains pod to allocated device mapping.
podDevices podDevices podDevices podDevices
store utilstore.Store
} }
type sourcesReadyStub struct{} type sourcesReadyStub struct{}
@ -114,6 +116,11 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) {
// Before that, initializes them to perform no-op operations. // Before that, initializes them to perform no-op operations.
manager.activePods = func() []*v1.Pod { return []*v1.Pod{} } manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
manager.sourcesReady = &sourcesReadyStub{} manager.sourcesReady = &sourcesReadyStub{}
var err error
manager.store, err = utilstore.NewFileStore(dir, utilfs.DefaultFs{})
if err != nil {
return nil, fmt.Errorf("failed to initialize device plugin checkpointing store: %+v", err)
}
return manager, nil return manager, nil
} }
@ -418,22 +425,27 @@ func (m *ManagerImpl) writeCheckpoint() error {
if err != nil { if err != nil {
return err return err
} }
filepath := m.checkpointFile() err = m.store.Write(kubeletDevicePluginCheckpoint, dataJSON)
return ioutil.WriteFile(filepath, dataJSON, 0644) if err != nil {
return fmt.Errorf("failed to write deviceplugin checkpoint file %q: %v", kubeletDevicePluginCheckpoint, err)
}
return nil
} }
// Reads device to container allocation information from disk, and populates // Reads device to container allocation information from disk, and populates
// m.allocatedDevices accordingly. // m.allocatedDevices accordingly.
func (m *ManagerImpl) readCheckpoint() error { func (m *ManagerImpl) readCheckpoint() error {
filepath := m.checkpointFile() content, err := m.store.Read(kubeletDevicePluginCheckpoint)
content, err := ioutil.ReadFile(filepath) if err != nil {
if err != nil && !os.IsNotExist(err) { if err == utilstore.ErrKeyNotFound {
return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err) return nil
}
return fmt.Errorf("failed to read checkpoint file %q: %v", kubeletDevicePluginCheckpoint, err)
} }
glog.V(2).Infof("Read checkpoint file %s\n", filepath) glog.V(4).Infof("Read checkpoint file %s\n", kubeletDevicePluginCheckpoint)
var data checkpointData var data checkpointData
if err := json.Unmarshal(content, &data); err != nil { if err := json.Unmarshal(content, &data); err != nil {
return fmt.Errorf("failed to unmarshal checkpoint data: %v", err) return fmt.Errorf("failed to unmarshal deviceplugin checkpoint data: %v", err)
} }
m.mutex.Lock() m.mutex.Lock()

View File

@ -33,6 +33,8 @@ import (
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
) )
@ -265,6 +267,7 @@ func TestCheckpoint(t *testing.T) {
allocatedDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices), podDevices: make(podDevices),
} }
testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{})
testManager.podDevices.insert("pod1", "con1", resourceName1, testManager.podDevices.insert("pod1", "con1", resourceName1,
constructDevices([]string{"dev1", "dev2"}), constructDevices([]string{"dev1", "dev2"}),
@ -394,6 +397,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
activePods: podsStub.getActivePods, activePods: podsStub.getActivePods,
sourcesReady: &sourcesReadyStub{}, sourcesReady: &sourcesReadyStub{},
} }
testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{})
testManager.allDevices[resourceName1] = sets.NewString() testManager.allDevices[resourceName1] = sets.NewString()
testManager.allDevices[resourceName1].Insert(devID1) testManager.allDevices[resourceName1].Insert(devID1)
@ -621,6 +625,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
allocatedDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices), podDevices: make(podDevices),
} }
testManager.store, _ = utilstore.NewFileStore("/tmp/", utilfs.DefaultFs{})
// require one of resource1 and one of resource2 // require one of resource1 and one of resource2
testManager.allocatedDevices[resourceName1] = sets.NewString() testManager.allocatedDevices[resourceName1] = sets.NewString()
testManager.allocatedDevices[resourceName1].Insert(devID1) testManager.allocatedDevices[resourceName1].Insert(devID1)