From e501f01d8593a451ba7c30b00d738d58cecad0b4 Mon Sep 17 00:00:00 2001 From: Jiaying Zhang Date: Tue, 24 Oct 2017 14:39:12 -0700 Subject: [PATCH] Move podDevices code into a separate file. --- .../cm/deviceplugin/device_plugin_handler.go | 226 ++--------------- pkg/kubelet/cm/deviceplugin/pod_devices.go | 228 ++++++++++++++++++ 2 files changed, 242 insertions(+), 212 deletions(-) create mode 100644 pkg/kubelet/cm/deviceplugin/pod_devices.go diff --git a/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go b/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go index d223fa4a824..23d67c3fd6d 100644 --- a/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go +++ b/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go @@ -29,210 +29,8 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) -type deviceAllocateInfo struct { - // deviceIds contains device Ids allocated to this container for the given resourceName. - deviceIds sets.String - // allocResp contains cached rpc AllocateResponse. - allocResp *pluginapi.AllocateResponse -} - -type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName. -type containerDevices map[string]resourceAllocateInfo // Keyed by containerName. -type podDevices map[string]containerDevices // Keyed by podUID. - -func (pdev podDevices) pods() sets.String { - ret := sets.NewString() - for k := range pdev { - ret.Insert(k) - } - return ret -} -func (pdev podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.AllocateResponse) { - if _, podExists := pdev[podUID]; !podExists { - pdev[podUID] = make(containerDevices) - } - if _, contExists := pdev[podUID][contName]; !contExists { - pdev[podUID][contName] = make(resourceAllocateInfo) - } - pdev[podUID][contName][resource] = deviceAllocateInfo{ - deviceIds: devices, - allocResp: resp, - } -} - -func (pdev podDevices) delete(pods []string) { - for _, uid := range pods { - delete(pdev, uid) - } -} - -// Returns list of device Ids allocated to the given container for the given resource. -// Returns nil if we don't have cached state for the given . -func (pdev podDevices) containerDevices(podUID, contName, resource string) sets.String { - if _, podExists := pdev[podUID]; !podExists { - return nil - } - if _, contExists := pdev[podUID][contName]; !contExists { - return nil - } - devs, resourceExists := pdev[podUID][contName][resource] - if !resourceExists { - return nil - } - return devs.deviceIds -} - -// Returns all of devices allocated to the pods being tracked, keyed by resourceName. -func (pdev podDevices) devices() map[string]sets.String { - ret := make(map[string]sets.String) - for _, containerDevices := range pdev { - for _, resources := range containerDevices { - for resource, devices := range resources { - if _, exists := ret[resource]; !exists { - ret[resource] = sets.NewString() - } - ret[resource] = ret[resource].Union(devices.deviceIds) - } - } - } - return ret -} - -type checkpointEntry struct { - PodUID string - ContainerName string - ResourceName string - DeviceIDs []string - AllocResp []byte -} - -// checkpointData struct is used to store pod to device allocation information -// in a checkpoint file. -// TODO: add version control when we need to change checkpoint format. -type checkpointData struct { - Entries []checkpointEntry -} - -// Turns podDevices to checkpointData. -func (pdev podDevices) toCheckpointData() checkpointData { - var data checkpointData - for podUID, containerDevices := range pdev { - for conName, resources := range containerDevices { - for resource, devices := range resources { - devIds := devices.deviceIds.UnsortedList() - allocResp, err := devices.allocResp.Marshal() - if err != nil { - glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err) - continue - } - data.Entries = append(data.Entries, checkpointEntry{podUID, conName, resource, devIds, allocResp}) - } - } - } - return data -} - -// Populates podDevices from the passed in checkpointData. -func (pdev podDevices) fromCheckpointData(data checkpointData) { - for _, entry := range data.Entries { - glog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n", - entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp) - devIDs := sets.NewString() - for _, devID := range entry.DeviceIDs { - devIDs.Insert(devID) - } - allocResp := &pluginapi.AllocateResponse{} - err := allocResp.Unmarshal(entry.AllocResp) - if err != nil { - glog.Errorf("Can't unmarshal allocResp for %v %v %v: %v", entry.PodUID, entry.ContainerName, entry.ResourceName, err) - continue - } - pdev.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, devIDs, allocResp) - } -} - -// Returns combined container runtime settings to consume the container's allocated devices. -func (pdev podDevices) deviceRunContainerOptions(podUID, contName string) *DeviceRunContainerOptions { - containers, exists := pdev[podUID] - if !exists { - return nil - } - resources, exists := containers[contName] - if !exists { - return nil - } - opts := &DeviceRunContainerOptions{} - // Maps to detect duplicate settings. - devsMap := make(map[string]string) - mountsMap := make(map[string]string) - envsMap := make(map[string]string) - // Loops through AllocationResponses of all cached device resources. - for _, devices := range resources { - resp := devices.allocResp - // Each Allocate response has the following artifacts. - // Environment variables - // Mount points - // Device files - // These artifacts are per resource per container. - // Updates RunContainerOptions.Envs. - for k, v := range resp.Envs { - if e, ok := envsMap[k]; ok { - glog.V(3).Infof("skip existing env %s %s", k, v) - if e != v { - glog.Errorf("Environment variable %s has conflicting setting: %s and %s", k, e, v) - } - continue - } - glog.V(4).Infof("add env %s %s", k, v) - envsMap[k] = v - opts.Envs = append(opts.Envs, kubecontainer.EnvVar{Name: k, Value: v}) - } - - // Updates RunContainerOptions.Devices. - for _, dev := range resp.Devices { - if d, ok := devsMap[dev.ContainerPath]; ok { - glog.V(3).Infof("skip existing device %s %s", dev.ContainerPath, dev.HostPath) - if d != dev.HostPath { - glog.Errorf("Container device %s has conflicting mapping host devices: %s and %s", - dev.ContainerPath, d, dev.HostPath) - } - continue - } - glog.V(4).Infof("add device %s %s", dev.ContainerPath, dev.HostPath) - devsMap[dev.ContainerPath] = dev.HostPath - opts.Devices = append(opts.Devices, kubecontainer.DeviceInfo{ - PathOnHost: dev.HostPath, - PathInContainer: dev.ContainerPath, - Permissions: dev.Permissions, - }) - } - // Updates RunContainerOptions.Mounts. - for _, mount := range resp.Mounts { - if m, ok := mountsMap[mount.ContainerPath]; ok { - glog.V(3).Infof("skip existing mount %s %s", mount.ContainerPath, mount.HostPath) - if m != mount.HostPath { - glog.Errorf("Container mount %s has conflicting mapping host mounts: %s and %s", - mount.ContainerPath, m, mount.HostPath) - } - continue - } - glog.V(4).Infof("add mount %s %s", mount.ContainerPath, mount.HostPath) - mountsMap[mount.ContainerPath] = mount.HostPath - opts.Mounts = append(opts.Mounts, kubecontainer.Mount{ - Name: mount.ContainerPath, - ContainerPath: mount.ContainerPath, - HostPath: mount.HostPath, - ReadOnly: mount.ReadOnly, - // TODO: This may need to be part of Device plugin API. - SELinuxRelabel: false, - }) - } - } - return opts -} // Handler defines the functions used to manage and access device plugin resources. type Handler interface { // Start starts device plugin registration service. @@ -405,19 +203,21 @@ func (h *HandlerImpl) Allocate(pod *v1.Pod, container *v1.Container, activePods // and container2 both require X resource A and Y resource B. Both allocation // requests may fail if we serve them in mixed order. // TODO: may revisit this part later if we see inefficient resource allocation - // in real use as the result of this. + // in real use as the result of this. Should also consider to parallize device + // plugin Allocate grpc calls if it becomes common that a container may require + // resources from multiple device plugins. resp, err := h.devicePluginManager.Allocate(resource, allocDevices.UnsortedList()) if err != nil { + // In case of allocation failure, we want to restore h.allocatedDevices + // to the actual allocated state from h.podDevices. + h.Lock() + h.allocatedDevices = h.podDevices.devices() + h.Unlock() return err } // Update internal cached podDevices state. h.Lock() - if oldDevices := h.podDevices.containerDevices(podUID, contName, resource); oldDevices != nil { - freeDevices := oldDevices.Difference(allocDevices) - glog.V(3).Infof("old %v alloc %v free devices %v for resource %s", oldDevices.UnsortedList(), allocDevices.UnsortedList(), freeDevices.UnsortedList(), resource) - h.allocatedDevices[resource] = h.allocatedDevices[resource].Difference(freeDevices) - } h.podDevices.insert(podUID, contName, resource, allocDevices, resp) h.Unlock() } @@ -458,21 +258,20 @@ func (h *HandlerImpl) updateAllocatedDevices(activePods []*v1.Pod) { // Checkpoints device to container allocation information to disk. func (h *HandlerImpl) writeCheckpoint() error { h.Lock() - defer h.Unlock() - filepath := h.devicePluginManager.CheckpointFile() data := h.podDevices.toCheckpointData() + h.Unlock() + dataJSON, err := json.Marshal(data) if err != nil { return err } + filepath := h.devicePluginManager.CheckpointFile() return ioutil.WriteFile(filepath, dataJSON, 0644) } // Reads device to container allocation information from disk, and populates // h.allocatedDevices accordingly. func (h *HandlerImpl) readCheckpoint() error { - h.Lock() - defer h.Unlock() filepath := h.devicePluginManager.CheckpointFile() content, err := ioutil.ReadFile(filepath) if err != nil && !os.IsNotExist(err) { @@ -483,6 +282,9 @@ func (h *HandlerImpl) readCheckpoint() error { if err := json.Unmarshal(content, &data); err != nil { return fmt.Errorf("failed to unmarshal checkpoint data: %v", err) } + + h.Lock() + defer h.Unlock() h.podDevices.fromCheckpointData(data) h.allocatedDevices = h.podDevices.devices() return nil diff --git a/pkg/kubelet/cm/deviceplugin/pod_devices.go b/pkg/kubelet/cm/deviceplugin/pod_devices.go new file mode 100644 index 00000000000..e343321e221 --- /dev/null +++ b/pkg/kubelet/cm/deviceplugin/pod_devices.go @@ -0,0 +1,228 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/util/sets" + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" +) + +type deviceAllocateInfo struct { + // deviceIds contains device Ids allocated to this container for the given resourceName. + deviceIds sets.String + // allocResp contains cached rpc AllocateResponse. + allocResp *pluginapi.AllocateResponse +} + +type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName. +type containerDevices map[string]resourceAllocateInfo // Keyed by containerName. +type podDevices map[string]containerDevices // Keyed by podUID. + +func (pdev podDevices) pods() sets.String { + ret := sets.NewString() + for k := range pdev { + ret.Insert(k) + } + return ret +} + +func (pdev podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.AllocateResponse) { + if _, podExists := pdev[podUID]; !podExists { + pdev[podUID] = make(containerDevices) + } + if _, contExists := pdev[podUID][contName]; !contExists { + pdev[podUID][contName] = make(resourceAllocateInfo) + } + pdev[podUID][contName][resource] = deviceAllocateInfo{ + deviceIds: devices, + allocResp: resp, + } +} + +func (pdev podDevices) delete(pods []string) { + for _, uid := range pods { + delete(pdev, uid) + } +} + +// Returns list of device Ids allocated to the given container for the given resource. +// Returns nil if we don't have cached state for the given . +func (pdev podDevices) containerDevices(podUID, contName, resource string) sets.String { + if _, podExists := pdev[podUID]; !podExists { + return nil + } + if _, contExists := pdev[podUID][contName]; !contExists { + return nil + } + devs, resourceExists := pdev[podUID][contName][resource] + if !resourceExists { + return nil + } + return devs.deviceIds +} + +// Returns all of devices allocated to the pods being tracked, keyed by resourceName. +func (pdev podDevices) devices() map[string]sets.String { + ret := make(map[string]sets.String) + for _, containerDevices := range pdev { + for _, resources := range containerDevices { + for resource, devices := range resources { + if _, exists := ret[resource]; !exists { + ret[resource] = sets.NewString() + } + ret[resource] = ret[resource].Union(devices.deviceIds) + } + } + } + return ret +} + +type checkpointEntry struct { + PodUID string + ContainerName string + ResourceName string + DeviceIDs []string + AllocResp []byte +} + +// checkpointData struct is used to store pod to device allocation information +// in a checkpoint file. +// TODO: add version control when we need to change checkpoint format. +type checkpointData struct { + Entries []checkpointEntry +} + +// Turns podDevices to checkpointData. +func (pdev podDevices) toCheckpointData() checkpointData { + var data checkpointData + for podUID, containerDevices := range pdev { + for conName, resources := range containerDevices { + for resource, devices := range resources { + devIds := devices.deviceIds.UnsortedList() + allocResp, err := devices.allocResp.Marshal() + if err != nil { + glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err) + continue + } + data.Entries = append(data.Entries, checkpointEntry{podUID, conName, resource, devIds, allocResp}) + } + } + } + return data +} + +// Populates podDevices from the passed in checkpointData. +func (pdev podDevices) fromCheckpointData(data checkpointData) { + for _, entry := range data.Entries { + glog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n", + entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp) + devIDs := sets.NewString() + for _, devID := range entry.DeviceIDs { + devIDs.Insert(devID) + } + allocResp := &pluginapi.AllocateResponse{} + err := allocResp.Unmarshal(entry.AllocResp) + if err != nil { + glog.Errorf("Can't unmarshal allocResp for %v %v %v: %v", entry.PodUID, entry.ContainerName, entry.ResourceName, err) + continue + } + pdev.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, devIDs, allocResp) + } +} + +// Returns combined container runtime settings to consume the container's allocated devices. +func (pdev podDevices) deviceRunContainerOptions(podUID, contName string) *DeviceRunContainerOptions { + containers, exists := pdev[podUID] + if !exists { + return nil + } + resources, exists := containers[contName] + if !exists { + return nil + } + opts := &DeviceRunContainerOptions{} + // Maps to detect duplicate settings. + devsMap := make(map[string]string) + mountsMap := make(map[string]string) + envsMap := make(map[string]string) + // Loops through AllocationResponses of all cached device resources. + for _, devices := range resources { + resp := devices.allocResp + // Each Allocate response has the following artifacts. + // Environment variables + // Mount points + // Device files + // These artifacts are per resource per container. + // Updates RunContainerOptions.Envs. + for k, v := range resp.Envs { + if e, ok := envsMap[k]; ok { + glog.V(3).Infof("skip existing env %s %s", k, v) + if e != v { + glog.Errorf("Environment variable %s has conflicting setting: %s and %s", k, e, v) + } + continue + } + glog.V(4).Infof("add env %s %s", k, v) + envsMap[k] = v + opts.Envs = append(opts.Envs, kubecontainer.EnvVar{Name: k, Value: v}) + } + + // Updates RunContainerOptions.Devices. + for _, dev := range resp.Devices { + if d, ok := devsMap[dev.ContainerPath]; ok { + glog.V(3).Infof("skip existing device %s %s", dev.ContainerPath, dev.HostPath) + if d != dev.HostPath { + glog.Errorf("Container device %s has conflicting mapping host devices: %s and %s", + dev.ContainerPath, d, dev.HostPath) + } + continue + } + glog.V(4).Infof("add device %s %s", dev.ContainerPath, dev.HostPath) + devsMap[dev.ContainerPath] = dev.HostPath + opts.Devices = append(opts.Devices, kubecontainer.DeviceInfo{ + PathOnHost: dev.HostPath, + PathInContainer: dev.ContainerPath, + Permissions: dev.Permissions, + }) + } + // Updates RunContainerOptions.Mounts. + for _, mount := range resp.Mounts { + if m, ok := mountsMap[mount.ContainerPath]; ok { + glog.V(3).Infof("skip existing mount %s %s", mount.ContainerPath, mount.HostPath) + if m != mount.HostPath { + glog.Errorf("Container mount %s has conflicting mapping host mounts: %s and %s", + mount.ContainerPath, m, mount.HostPath) + } + continue + } + glog.V(4).Infof("add mount %s %s", mount.ContainerPath, mount.HostPath) + mountsMap[mount.ContainerPath] = mount.HostPath + opts.Mounts = append(opts.Mounts, kubecontainer.Mount{ + Name: mount.ContainerPath, + ContainerPath: mount.ContainerPath, + HostPath: mount.HostPath, + ReadOnly: mount.ReadOnly, + // TODO: This may need to be part of Device plugin API. + SELinuxRelabel: false, + }) + } + } + return opts +}