Merge pull request #51744 from jiayingz/deviceplugin-checkpoint

Automatic merge from submit-queue (batch tested with PRs 50072, 51744)

Deviceplugin checkpoint

**What this PR does / why we need it**:
Extends on top of PR 51209 to checkpoint device to pod allocation information on Kubelet to recover from Kubelet restarts.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #

**Special notes for your reviewer**:

**Release note**:

```release-note
```
This commit is contained in:
Kubernetes Submit Queue 2017-09-05 13:33:01 -07:00 committed by GitHub
commit 8b9e8cf80a
4 changed files with 129 additions and 12 deletions

View File

@ -17,7 +17,10 @@ limitations under the License.
package cm
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sync"
"github.com/golang/glog"
@ -108,7 +111,7 @@ func NewDevicePluginHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*Devi
glog.V(2).Infof("Creating Device Plugin Handler")
handler := &DevicePluginHandlerImpl{
allDevices: make(map[string]sets.String),
allocatedDevices: devicesInUse(),
allocatedDevices: make(map[string]podDevices),
}
deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []*pluginapi.Device) {
@ -140,6 +143,11 @@ func NewDevicePluginHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*Devi
handler.devicePluginManager = mgr
handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback
// Loads in allocatedDevices information from disk.
err = handler.readCheckpoint()
if err != nil {
glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
}
return handler, nil
}
@ -202,16 +210,13 @@ func (h *DevicePluginHandlerImpl) Allocate(pod *v1.Pod, container *v1.Container,
}
ret = append(ret, resp)
}
// Checkpoints device to container allocation information.
if err := h.writeCheckpoint(); err != nil {
return nil, err
}
return ret, nil
}
// devicesInUse returns a list of custom devices in use along with the
// respective pods that are using them.
func devicesInUse() map[string]podDevices {
// TODO: gets the initial state from checkpointing.
return make(map[string]podDevices)
}
// updateAllocatedDevices updates the list of GPUs in use.
// It gets a list of active pods and then frees any GPUs that are bound to
// terminated pods. Returns error on failure.
@ -229,3 +234,60 @@ func (h *DevicePluginHandlerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
podDevs.delete(podsToBeRemoved.List())
}
}
type checkpointEntry struct {
PodUID string
ContainerName string
ResourceName string
DeviceID string
}
// checkpointData struct is used to store pod to device allocation information
// in a checkpoint file.
// TODO: add version control when we need to change checkpoint format.
type checkpointData struct {
Entries []checkpointEntry
}
// Checkpoints device to container allocation information to disk.
func (h *DevicePluginHandlerImpl) writeCheckpoint() error {
filepath := h.devicePluginManager.CheckpointFile()
var data checkpointData
for resourceName, podDev := range h.allocatedDevices {
for podUID, conDev := range podDev {
for conName, devs := range conDev {
for _, devId := range devs.UnsortedList() {
data.Entries = append(data.Entries, checkpointEntry{podUID, conName, resourceName, devId})
}
}
}
}
dataJson, err := json.Marshal(data)
if err != nil {
return err
}
return ioutil.WriteFile(filepath, dataJson, 0644)
}
// Reads device to container allocation information from disk, and populates
// h.allocatedDevices accordingly.
func (h *DevicePluginHandlerImpl) readCheckpoint() error {
filepath := h.devicePluginManager.CheckpointFile()
content, err := ioutil.ReadFile(filepath)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err)
}
glog.V(2).Infof("Read checkpoint file %s\n", filepath)
var data checkpointData
if err := json.Unmarshal(content, &data); err != nil {
return fmt.Errorf("failed to unmarshal checkpoint data: %v", err)
}
for _, entry := range data.Entries {
glog.V(2).Infof("Get checkpoint entry: %v %v %v %v\n", entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceID)
if h.allocatedDevices[entry.ResourceName] == nil {
h.allocatedDevices[entry.ResourceName] = make(podDevices)
}
h.allocatedDevices[entry.ResourceName].insert(entry.PodUID, entry.ContainerName, entry.DeviceID)
}
return nil
}

View File

@ -128,6 +128,41 @@ func (m *DevicePluginManagerTestStub) Stop() error {
return nil
}
func (m *DevicePluginManagerTestStub) CheckpointFile() string {
return "/tmp/device-plugin-checkpoint"
}
func TestCheckpoint(t *testing.T) {
resourceName1 := "domain1.com/resource1"
resourceName2 := "domain2.com/resource2"
m, err := NewDevicePluginManagerTestStub()
as := assert.New(t)
as.Nil(err)
testDevicePluginHandler := &DevicePluginHandlerImpl{
devicePluginManager: m,
allDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]podDevices),
}
testDevicePluginHandler.allocatedDevices[resourceName1] = make(podDevices)
testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con1", "dev1")
testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con1", "dev2")
testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con2", "dev1")
testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod2", "con1", "dev1")
testDevicePluginHandler.allocatedDevices[resourceName2] = make(podDevices)
testDevicePluginHandler.allocatedDevices[resourceName2].insert("pod1", "con1", "dev3")
testDevicePluginHandler.allocatedDevices[resourceName2].insert("pod1", "con1", "dev4")
err = testDevicePluginHandler.writeCheckpoint()
as.Nil(err)
expected := testDevicePluginHandler.allocatedDevices
testDevicePluginHandler.allocatedDevices = make(map[string]podDevices)
err = testDevicePluginHandler.readCheckpoint()
as.Nil(err)
as.Equal(expected, testDevicePluginHandler.allocatedDevices)
}
func TestPodContainerDeviceAllocation(t *testing.T) {
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
var logLevel string

View File

@ -63,7 +63,7 @@ func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error)
}, nil
}
func removeContents(dir string) error {
func (m *ManagerImpl) removeContents(dir string) error {
d, err := os.Open(dir)
if err != nil {
return err
@ -74,8 +74,19 @@ func removeContents(dir string) error {
return err
}
for _, name := range names {
// TODO: skip checkpoint file and check for file type.
err = os.RemoveAll(filepath.Join(dir, name))
filePath := filepath.Join(dir, name)
if filePath == m.CheckpointFile() {
continue
}
stat, err := os.Stat(filePath)
if err != nil {
glog.Errorf("Failed to stat file %v: %v", filePath, err)
continue
}
if stat.IsDir() {
continue
}
err = os.RemoveAll(filePath)
if err != nil {
return err
}
@ -83,6 +94,11 @@ func removeContents(dir string) error {
return nil
}
// CheckpointFile returns device plugin checkpoint file path.
func (m *ManagerImpl) CheckpointFile() string {
return filepath.Join(m.socketdir, "kubelet_internal_checkpoint")
}
// Start starts the Device Plugin Manager
func (m *ManagerImpl) Start() error {
glog.V(2).Infof("Starting Device Plugin manager")
@ -92,7 +108,7 @@ func (m *ManagerImpl) Start() error {
// Removes all stale sockets in m.socketdir. Device plugins can monitor
// this and use it as a signal to re-register with the new Kubelet.
if err := removeContents(m.socketdir); err != nil {
if err := m.removeContents(m.socketdir); err != nil {
glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err)
}

View File

@ -29,6 +29,7 @@ type MonitorCallback func(resourceName string, added, updated, deleted []*plugin
type Manager interface {
// Start starts the gRPC Registration service.
Start() error
// Devices is the map of devices that have registered themselves
// against the manager.
// The map key is the ResourceName of the device plugins.
@ -40,6 +41,9 @@ type Manager interface {
// Stop stops the manager.
Stop() error
// Returns checkpoint file path.
CheckpointFile() string
}
// TODO: evaluate whether we need these error definitions.