mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
Extends device_plugin_handler to checkpoint device to container allocation information.
This commit is contained in:
parent
775f5d232d
commit
3b2bc58c11
@ -17,7 +17,10 @@ limitations under the License.
|
|||||||
package cm
|
package cm
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -108,7 +111,7 @@ func NewDevicePluginHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*Devi
|
|||||||
glog.V(2).Infof("Creating Device Plugin Handler")
|
glog.V(2).Infof("Creating Device Plugin Handler")
|
||||||
handler := &DevicePluginHandlerImpl{
|
handler := &DevicePluginHandlerImpl{
|
||||||
allDevices: make(map[string]sets.String),
|
allDevices: make(map[string]sets.String),
|
||||||
allocatedDevices: devicesInUse(),
|
allocatedDevices: make(map[string]podDevices),
|
||||||
}
|
}
|
||||||
|
|
||||||
deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []*pluginapi.Device) {
|
deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []*pluginapi.Device) {
|
||||||
@ -140,6 +143,11 @@ func NewDevicePluginHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*Devi
|
|||||||
|
|
||||||
handler.devicePluginManager = mgr
|
handler.devicePluginManager = mgr
|
||||||
handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback
|
handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback
|
||||||
|
// Loads in allocatedDevices information from disk.
|
||||||
|
err = handler.readCheckpoint()
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
|
||||||
|
}
|
||||||
return handler, nil
|
return handler, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,16 +210,13 @@ func (h *DevicePluginHandlerImpl) Allocate(pod *v1.Pod, container *v1.Container,
|
|||||||
}
|
}
|
||||||
ret = append(ret, resp)
|
ret = append(ret, resp)
|
||||||
}
|
}
|
||||||
|
// Checkpoints device to container allocation information.
|
||||||
|
if err := h.writeCheckpoint(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// devicesInUse returns a list of custom devices in use along with the
|
|
||||||
// respective pods that are using them.
|
|
||||||
func devicesInUse() map[string]podDevices {
|
|
||||||
// TODO: gets the initial state from checkpointing.
|
|
||||||
return make(map[string]podDevices)
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateAllocatedDevices updates the list of GPUs in use.
|
// updateAllocatedDevices updates the list of GPUs in use.
|
||||||
// It gets a list of active pods and then frees any GPUs that are bound to
|
// It gets a list of active pods and then frees any GPUs that are bound to
|
||||||
// terminated pods. Returns error on failure.
|
// terminated pods. Returns error on failure.
|
||||||
@ -229,3 +234,60 @@ func (h *DevicePluginHandlerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
|
|||||||
podDevs.delete(podsToBeRemoved.List())
|
podDevs.delete(podsToBeRemoved.List())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type checkpointEntry struct {
|
||||||
|
PodUID string
|
||||||
|
ContainerName string
|
||||||
|
ResourceName string
|
||||||
|
DeviceID string
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkpointData struct is used to store pod to device allocation information
|
||||||
|
// in a checkpoint file.
|
||||||
|
// TODO: add version control when we need to change checkpoint format.
|
||||||
|
type checkpointData struct {
|
||||||
|
Entries []checkpointEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
// Checkpoints device to container allocation information to disk.
|
||||||
|
func (h *DevicePluginHandlerImpl) writeCheckpoint() error {
|
||||||
|
filepath := h.devicePluginManager.CheckpointFile()
|
||||||
|
var data checkpointData
|
||||||
|
for resourceName, podDev := range h.allocatedDevices {
|
||||||
|
for podUID, conDev := range podDev {
|
||||||
|
for conName, devs := range conDev {
|
||||||
|
for _, devId := range devs.UnsortedList() {
|
||||||
|
data.Entries = append(data.Entries, checkpointEntry{podUID, conName, resourceName, devId})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dataJson, err := json.Marshal(data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return ioutil.WriteFile(filepath, dataJson, 0644)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reads device to container allocation information from disk, and populates
|
||||||
|
// h.allocatedDevices accordingly.
|
||||||
|
func (h *DevicePluginHandlerImpl) readCheckpoint() error {
|
||||||
|
filepath := h.devicePluginManager.CheckpointFile()
|
||||||
|
content, err := ioutil.ReadFile(filepath)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err)
|
||||||
|
}
|
||||||
|
glog.V(2).Infof("Read checkpoint file %s\n", filepath)
|
||||||
|
var data checkpointData
|
||||||
|
if err := json.Unmarshal(content, &data); err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal checkpoint data: %v", err)
|
||||||
|
}
|
||||||
|
for _, entry := range data.Entries {
|
||||||
|
glog.V(2).Infof("Get checkpoint entry: %v %v %v %v\n", entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceID)
|
||||||
|
if h.allocatedDevices[entry.ResourceName] == nil {
|
||||||
|
h.allocatedDevices[entry.ResourceName] = make(podDevices)
|
||||||
|
}
|
||||||
|
h.allocatedDevices[entry.ResourceName].insert(entry.PodUID, entry.ContainerName, entry.DeviceID)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -128,6 +128,41 @@ func (m *DevicePluginManagerTestStub) Stop() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *DevicePluginManagerTestStub) CheckpointFile() string {
|
||||||
|
return "/tmp/device-plugin-checkpoint"
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckpoint(t *testing.T) {
|
||||||
|
resourceName1 := "domain1.com/resource1"
|
||||||
|
resourceName2 := "domain2.com/resource2"
|
||||||
|
|
||||||
|
m, err := NewDevicePluginManagerTestStub()
|
||||||
|
as := assert.New(t)
|
||||||
|
as.Nil(err)
|
||||||
|
|
||||||
|
testDevicePluginHandler := &DevicePluginHandlerImpl{
|
||||||
|
devicePluginManager: m,
|
||||||
|
allDevices: make(map[string]sets.String),
|
||||||
|
allocatedDevices: make(map[string]podDevices),
|
||||||
|
}
|
||||||
|
testDevicePluginHandler.allocatedDevices[resourceName1] = make(podDevices)
|
||||||
|
testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con1", "dev1")
|
||||||
|
testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con1", "dev2")
|
||||||
|
testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con2", "dev1")
|
||||||
|
testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod2", "con1", "dev1")
|
||||||
|
testDevicePluginHandler.allocatedDevices[resourceName2] = make(podDevices)
|
||||||
|
testDevicePluginHandler.allocatedDevices[resourceName2].insert("pod1", "con1", "dev3")
|
||||||
|
testDevicePluginHandler.allocatedDevices[resourceName2].insert("pod1", "con1", "dev4")
|
||||||
|
|
||||||
|
err = testDevicePluginHandler.writeCheckpoint()
|
||||||
|
as.Nil(err)
|
||||||
|
expected := testDevicePluginHandler.allocatedDevices
|
||||||
|
testDevicePluginHandler.allocatedDevices = make(map[string]podDevices)
|
||||||
|
err = testDevicePluginHandler.readCheckpoint()
|
||||||
|
as.Nil(err)
|
||||||
|
as.Equal(expected, testDevicePluginHandler.allocatedDevices)
|
||||||
|
}
|
||||||
|
|
||||||
func TestPodContainerDeviceAllocation(t *testing.T) {
|
func TestPodContainerDeviceAllocation(t *testing.T) {
|
||||||
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
|
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
|
||||||
var logLevel string
|
var logLevel string
|
||||||
|
@ -63,7 +63,7 @@ func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error)
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func removeContents(dir string) error {
|
func (m *ManagerImpl) removeContents(dir string) error {
|
||||||
d, err := os.Open(dir)
|
d, err := os.Open(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -74,8 +74,19 @@ func removeContents(dir string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
// TODO: skip checkpoint file and check for file type.
|
filePath := filepath.Join(dir, name)
|
||||||
err = os.RemoveAll(filepath.Join(dir, name))
|
if filePath == m.CheckpointFile() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
stat, err := os.Stat(filePath)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to stat file %v: %v", filePath, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if stat.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
err = os.RemoveAll(filePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -83,6 +94,11 @@ func removeContents(dir string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckpointFile returns device plugin checkpoint file path.
|
||||||
|
func (m *ManagerImpl) CheckpointFile() string {
|
||||||
|
return filepath.Join(m.socketdir, "kubelet_internal_checkpoint")
|
||||||
|
}
|
||||||
|
|
||||||
// Start starts the Device Plugin Manager
|
// Start starts the Device Plugin Manager
|
||||||
func (m *ManagerImpl) Start() error {
|
func (m *ManagerImpl) Start() error {
|
||||||
glog.V(2).Infof("Starting Device Plugin manager")
|
glog.V(2).Infof("Starting Device Plugin manager")
|
||||||
@ -92,7 +108,7 @@ func (m *ManagerImpl) Start() error {
|
|||||||
|
|
||||||
// Removes all stale sockets in m.socketdir. Device plugins can monitor
|
// Removes all stale sockets in m.socketdir. Device plugins can monitor
|
||||||
// this and use it as a signal to re-register with the new Kubelet.
|
// this and use it as a signal to re-register with the new Kubelet.
|
||||||
if err := removeContents(m.socketdir); err != nil {
|
if err := m.removeContents(m.socketdir); err != nil {
|
||||||
glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err)
|
glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,6 +29,7 @@ type MonitorCallback func(resourceName string, added, updated, deleted []*plugin
|
|||||||
type Manager interface {
|
type Manager interface {
|
||||||
// Start starts the gRPC Registration service.
|
// Start starts the gRPC Registration service.
|
||||||
Start() error
|
Start() error
|
||||||
|
|
||||||
// Devices is the map of devices that have registered themselves
|
// Devices is the map of devices that have registered themselves
|
||||||
// against the manager.
|
// against the manager.
|
||||||
// The map key is the ResourceName of the device plugins.
|
// The map key is the ResourceName of the device plugins.
|
||||||
@ -40,6 +41,9 @@ type Manager interface {
|
|||||||
|
|
||||||
// Stop stops the manager.
|
// Stop stops the manager.
|
||||||
Stop() error
|
Stop() error
|
||||||
|
|
||||||
|
// Returns checkpoint file path.
|
||||||
|
CheckpointFile() string
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: evaluate whether we need these error definitions.
|
// TODO: evaluate whether we need these error definitions.
|
||||||
|
Loading…
Reference in New Issue
Block a user