Merge pull request #46105 from sjenning/update-conatiner-resource-cri

Automatic merge from submit-queue (batch tested with PRs 49488, 50407, 46105, 50456, 50258)

Add UpdateContainerResources method to CRI

This is first step toward support for opinionated cpu pinning for certain guaranteed pods.

In order to do this, the kubelet needs to be able to dynamically update the cpuset at the container level, which is managed by the container runtime.  Thus the kubelet needs a method to communicate over the CRI so the runtime can then modify the container cgroup.

This is used in the situation where a core is added or removed from the shared pool to become a exclusive core for a new G pod.  The cpuset for all containers in the shared pool will need to be updated to add or remove that core.

Opening this up now so we can start discussion.  The need for a change to the CRI might be unexpected.

@derekwaynecarr @vishh @ConnorDoyle 

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-08-11 14:14:00 -07:00 committed by GitHub
commit b9b875f0d7
14 changed files with 922 additions and 383 deletions

View File

@ -43,6 +43,8 @@ type ContainerManager interface {
ListContainers(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error)
// ContainerStatus returns the status of the container.
ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error)
// UpdateContainerResources updates the cgroup resources for the container.
UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error)

View File

@ -362,6 +362,10 @@ func (r *FakeRuntimeService) ContainerStatus(containerID string) (*runtimeapi.Co
return &status, nil
}
func (r *FakeRuntimeService) UpdateContainerResources(string, *runtimeapi.LinuxContainerResources) error {
return nil
}
func (r *FakeRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
r.Lock()
defer r.Unlock()

File diff suppressed because it is too large Load Diff

View File

@ -61,6 +61,8 @@ service RuntimeService {
// ContainerStatus returns status of the container. If the container is not
// present, returns an error.
rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {}
// UpdateContainerResources updates ContainerConfig of the container.
rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {}
// ExecSync runs a command in a container synchronously.
rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {}
@ -445,6 +447,10 @@ message LinuxContainerResources {
int64 memory_limit_in_bytes = 4;
// OOMScoreAdj adjusts the oom-killer score. Default: 0 (not specified).
int64 oom_score_adj = 5;
// CpusetCpus constrains the allowed set of logical CPUs. Default: "" (not specified).
string cpuset_cpus = 6;
// CpusetMems constrains the allowed set of memory nodes. Default: "" (not specified).
string cpuset_mems = 7;
}
// SELinuxOption are the labels to be applied to the container.
@ -772,6 +778,15 @@ message ContainerStatusResponse {
ContainerStatus status = 1;
}
message UpdateContainerResourcesRequest {
// ID of the container to update.
string container_id = 1;
// Resource configuration specific to Linux containers.
LinuxContainerResources linux = 2;
}
message UpdateContainerResourcesResponse {}
message ExecSyncRequest {
// ID of the container.
string container_id = 1;

View File

@ -402,3 +402,22 @@ func (ds *dockerService) ContainerStatus(containerID string) (*runtimeapi.Contai
LogPath: r.Config.Labels[containerLogPathLabelKey],
}, nil
}
func (ds *dockerService) UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error {
updateConfig := dockercontainer.UpdateConfig{
Resources: dockercontainer.Resources{
CPUPeriod: resources.CpuPeriod,
CPUQuota: resources.CpuQuota,
CPUShares: resources.CpuShares,
Memory: resources.MemoryLimitInBytes,
CpusetCpus: resources.CpusetCpus,
CpusetMems: resources.CpusetMems,
},
}
err := ds.client.UpdateContainerResources(containerID, updateConfig)
if err != nil {
return fmt.Errorf("failed to update container %q: %v", containerID, err)
}
return nil
}

View File

@ -48,6 +48,7 @@ type Interface interface {
CreateContainer(dockertypes.ContainerCreateConfig) (*dockercontainer.ContainerCreateCreatedBody, error)
StartContainer(id string) error
StopContainer(id string, timeout time.Duration) error
UpdateContainerResources(id string, updateConfig dockercontainer.UpdateConfig) error
RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error
InspectImageByRef(imageRef string) (*dockertypes.ImageInspect, error)
InspectImageByID(imageID string) (*dockertypes.ImageInspect, error)

View File

@ -625,6 +625,10 @@ func (f *FakeDockerClient) RemoveContainer(id string, opts dockertypes.Container
return fmt.Errorf("container not stopped")
}
func (f *FakeDockerClient) UpdateContainerResources(id string, updateConfig dockercontainer.UpdateConfig) error {
return nil
}
// Logs is a test-spy implementation of Interface.Logs.
// It adds an entry "logs" to the internal method call record.
func (f *FakeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions, sopts StreamOptions) error {

View File

@ -108,6 +108,15 @@ func (in instrumentedInterface) RemoveContainer(id string, opts dockertypes.Cont
return err
}
func (in instrumentedInterface) UpdateContainerResources(id string, updateConfig dockercontainer.UpdateConfig) error {
const operation = "update_container"
defer recordOperation(operation, time.Now())
err := in.client.UpdateContainerResources(id, updateConfig)
recordError(operation, err)
return err
}
func (in instrumentedInterface) InspectImageByRef(image string) (*dockertypes.ImageInspect, error) {
const operation = "inspect_image"
defer recordOperation(operation, time.Now())

View File

@ -171,6 +171,16 @@ func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.Container
return err
}
func (d *kubeDockerClient) UpdateContainerResources(id string, updateConfig dockercontainer.UpdateConfig) error {
ctx, cancel := d.getTimeoutContext()
defer cancel()
_, err := d.client.ContainerUpdate(ctx, id, updateConfig)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
func (d *kubeDockerClient) inspectImageRaw(ref string) (*dockertypes.ImageInspect, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()

View File

@ -147,6 +147,14 @@ func (d *dockerService) ContainerStatus(ctx context.Context, r *runtimeapi.Conta
return &runtimeapi.ContainerStatusResponse{Status: status}, nil
}
func (d *dockerService) UpdateContainerResources(ctx context.Context, r *runtimeapi.UpdateContainerResourcesRequest) (*runtimeapi.UpdateContainerResourcesResponse, error) {
err := d.runtimeService.UpdateContainerResources(r.ContainerId, r.Linux)
if err != nil {
return nil, err
}
return &runtimeapi.UpdateContainerResourcesResponse{}, nil
}
func (d *dockerService) ExecSync(ctx context.Context, r *runtimeapi.ExecSyncRequest) (*runtimeapi.ExecSyncResponse, error) {
stdout, stderr, err := d.runtimeService.ExecSync(r.ContainerId, r.Cmd, time.Duration(r.Timeout)*time.Second)
var exitCode int32

View File

@ -131,6 +131,15 @@ func (in instrumentedRuntimeService) ContainerStatus(containerID string) (*runti
return out, err
}
func (in instrumentedRuntimeService) UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error {
const operation = "container_status"
defer recordOperation(operation, time.Now())
err := in.service.UpdateContainerResources(containerID, resources)
recordError(operation, err)
return err
}
func (in instrumentedRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) ([]byte, []byte, error) {
const operation = "exec_sync"
defer recordOperation(operation, time.Now())

View File

@ -288,6 +288,23 @@ func (r *RemoteRuntimeService) ContainerStatus(containerID string) (*runtimeapi.
return resp.Status, nil
}
// UpdateContainerResources updates a containers resource config
func (r *RemoteRuntimeService) UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
_, err := r.runtimeClient.UpdateContainerResources(ctx, &runtimeapi.UpdateContainerResourcesRequest{
ContainerId: containerID,
Linux: resources,
})
if err != nil {
glog.Errorf("UpdateContainerResources %q from runtime service failed: %v", containerID, err)
return err
}
return nil
}
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (r *RemoteRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {

View File

@ -65,6 +65,11 @@ func (*Runtime) ContainerStatus(string) (*runtimeapi.ContainerStatus, error) {
panic("not implemented")
}
// UpdateContainerResources updates the resource constraints for the container.
func (*Runtime) UpdateContainerResources(string, *runtimeapi.LinuxContainerResources) error {
panic("not implemented")
}
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (*Runtime) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {

View File

@ -205,6 +205,10 @@ func (r *FakeRuntime) ContainerStatus(id string) (*runtimeapi.ContainerStatus, e
return &c.Status, nil
}
func (r *FakeRuntime) UpdateContainerResources(string, *runtimeapi.LinuxContainerResources) error {
return nil
}
func (r *FakeRuntime) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
c, ok := r.Containers[containerID]
if !ok {