Merge pull request #58548 from yujuhong/simplify-ds

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

dockershim: remove the use of kubelet's internal API

We let dockershim implement the kubelet's internal (CRI) API as an
intermediary step before transitioning fully to communicate using gRPC.
Now that kubelet has been communicating to the runtime over gRPC for
multiple releases, we can safely retire the extra interface in
dockershim.

This PR also moves the legacy functions to a separate file and clean up
the interfaces.
This commit is contained in:
Kubernetes Submit Queue 2018-01-19 20:45:07 -08:00 committed by GitHub
commit bfac95e71b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 454 additions and 571 deletions

View File

@ -998,10 +998,6 @@ func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConf
if err != nil { if err != nil {
return err return err
} }
if err := ds.Start(); err != nil {
return err
}
glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.") glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds) server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds)
if err := server.Start(); err != nil { if err := server.Start(); err != nil {

View File

@ -178,7 +178,6 @@ pkg/kubelet/custommetrics
pkg/kubelet/dockershim pkg/kubelet/dockershim
pkg/kubelet/dockershim/cm pkg/kubelet/dockershim/cm
pkg/kubelet/dockershim/libdocker pkg/kubelet/dockershim/libdocker
pkg/kubelet/dockershim/remote
pkg/kubelet/dockershim/testing pkg/kubelet/dockershim/testing
pkg/kubelet/events pkg/kubelet/events
pkg/kubelet/gpu pkg/kubelet/gpu

View File

@ -14,6 +14,7 @@ go_library(
"docker_checkpoint.go", "docker_checkpoint.go",
"docker_container.go", "docker_container.go",
"docker_image.go", "docker_image.go",
"docker_legacy_service.go",
"docker_sandbox.go", "docker_sandbox.go",
"docker_service.go", "docker_service.go",
"docker_streaming.go", "docker_streaming.go",
@ -83,7 +84,6 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/kubelet/dockershim", importpath = "k8s.io/kubernetes/pkg/kubelet/dockershim",
deps = [ deps = [
"//pkg/credentialprovider:go_default_library", "//pkg/credentialprovider:go_default_library",
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/cm:go_default_library",
@ -91,6 +91,7 @@ go_library(
"//pkg/kubelet/dockershim/cm:go_default_library", "//pkg/kubelet/dockershim/cm:go_default_library",
"//pkg/kubelet/dockershim/libdocker:go_default_library", "//pkg/kubelet/dockershim/libdocker:go_default_library",
"//pkg/kubelet/dockershim/metrics:go_default_library", "//pkg/kubelet/dockershim/metrics:go_default_library",
"//pkg/kubelet/kuberuntime:go_default_library",
"//pkg/kubelet/leaky:go_default_library", "//pkg/kubelet/leaky:go_default_library",
"//pkg/kubelet/network:go_default_library", "//pkg/kubelet/network:go_default_library",
"//pkg/kubelet/network/cni:go_default_library", "//pkg/kubelet/network/cni:go_default_library",
@ -115,11 +116,13 @@ go_library(
"//vendor/github.com/docker/docker/pkg/jsonmessage:go_default_library", "//vendor/github.com/docker/docker/pkg/jsonmessage:go_default_library",
"//vendor/github.com/docker/go-connections/nat:go_default_library", "//vendor/github.com/docker/go-connections/nat:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library", "//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
], ],
) )
@ -165,6 +168,7 @@ go_test(
"//vendor/github.com/golang/mock/gomock:go_default_library", "//vendor/github.com/golang/mock/gomock:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
], ],
) )

View File

@ -27,13 +27,15 @@ import (
dockerfilters "github.com/docker/docker/api/types/filters" dockerfilters "github.com/docker/docker/api/types/filters"
dockerstrslice "github.com/docker/docker/api/types/strslice" dockerstrslice "github.com/docker/docker/api/types/strslice"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/context"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
) )
// ListContainers lists all containers matching the filter. // ListContainers lists all containers matching the filter.
func (ds *dockerService) ListContainers(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) { func (ds *dockerService) ListContainers(_ context.Context, r *runtimeapi.ListContainersRequest) (*runtimeapi.ListContainersResponse, error) {
filter := r.GetFilter()
opts := dockertypes.ContainerListOptions{All: true} opts := dockertypes.ContainerListOptions{All: true}
opts.Filters = dockerfilters.NewArgs() opts.Filters = dockerfilters.NewArgs()
@ -75,19 +77,24 @@ func (ds *dockerService) ListContainers(filter *runtimeapi.ContainerFilter) ([]*
result = append(result, converted) result = append(result, converted)
} }
return result, nil
return &runtimeapi.ListContainersResponse{Containers: result}, nil
} }
// CreateContainer creates a new container in the given PodSandbox // CreateContainer creates a new container in the given PodSandbox
// Docker cannot store the log to an arbitrary location (yet), so we create an // Docker cannot store the log to an arbitrary location (yet), so we create an
// symlink at LogPath, linking to the actual path of the log. // symlink at LogPath, linking to the actual path of the log.
// TODO: check if the default values returned by the runtime API are ok. // TODO: check if the default values returned by the runtime API are ok.
func (ds *dockerService) CreateContainer(podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) { func (ds *dockerService) CreateContainer(_ context.Context, r *runtimeapi.CreateContainerRequest) (*runtimeapi.CreateContainerResponse, error) {
podSandboxID := r.PodSandboxId
config := r.GetConfig()
sandboxConfig := r.GetSandboxConfig()
if config == nil { if config == nil {
return "", fmt.Errorf("container config is nil") return nil, fmt.Errorf("container config is nil")
} }
if sandboxConfig == nil { if sandboxConfig == nil {
return "", fmt.Errorf("sandbox config is nil for container %q", config.Metadata.Name) return nil, fmt.Errorf("sandbox config is nil for container %q", config.Metadata.Name)
} }
labels := makeLabels(config.GetLabels(), config.GetAnnotations()) labels := makeLabels(config.GetLabels(), config.GetAnnotations())
@ -100,7 +107,7 @@ func (ds *dockerService) CreateContainer(podSandboxID string, config *runtimeapi
apiVersion, err := ds.getDockerAPIVersion() apiVersion, err := ds.getDockerAPIVersion()
if err != nil { if err != nil {
return "", fmt.Errorf("unable to get the docker API version: %v", err) return nil, fmt.Errorf("unable to get the docker API version: %v", err)
} }
image := "" image := ""
@ -147,7 +154,7 @@ func (ds *dockerService) CreateContainer(podSandboxID string, config *runtimeapi
securityOpts, err := ds.getSecurityOpts(config.GetLinux().GetSecurityContext().GetSeccompProfilePath(), securityOptSeparator) securityOpts, err := ds.getSecurityOpts(config.GetLinux().GetSecurityContext().GetSeccompProfilePath(), securityOptSeparator)
if err != nil { if err != nil {
return "", fmt.Errorf("failed to generate security options for container %q: %v", config.Metadata.Name, err) return nil, fmt.Errorf("failed to generate security options for container %q: %v", config.Metadata.Name, err)
} }
hc.SecurityOpt = append(hc.SecurityOpt, securityOpts...) hc.SecurityOpt = append(hc.SecurityOpt, securityOpts...)
@ -158,9 +165,9 @@ func (ds *dockerService) CreateContainer(podSandboxID string, config *runtimeapi
} }
if createResp != nil { if createResp != nil {
return createResp.ID, err return &runtimeapi.CreateContainerResponse{ContainerId: createResp.ID}, nil
} }
return "", err return nil, err
} }
// getContainerLogPath returns the container log path specified by kubelet and the real // getContainerLogPath returns the container log path specified by kubelet and the real
@ -229,45 +236,49 @@ func (ds *dockerService) removeContainerLogSymlink(containerID string) error {
} }
// StartContainer starts the container. // StartContainer starts the container.
func (ds *dockerService) StartContainer(containerID string) error { func (ds *dockerService) StartContainer(_ context.Context, r *runtimeapi.StartContainerRequest) (*runtimeapi.StartContainerResponse, error) {
err := ds.client.StartContainer(containerID) err := ds.client.StartContainer(r.ContainerId)
// Create container log symlink for all containers (including failed ones). // Create container log symlink for all containers (including failed ones).
if linkError := ds.createContainerLogSymlink(containerID); linkError != nil { if linkError := ds.createContainerLogSymlink(r.ContainerId); linkError != nil {
// Do not stop the container if we failed to create symlink because: // Do not stop the container if we failed to create symlink because:
// 1. This is not a critical failure. // 1. This is not a critical failure.
// 2. We don't have enough information to properly stop container here. // 2. We don't have enough information to properly stop container here.
// Kubelet will surface this error to user via an event. // Kubelet will surface this error to user via an event.
return linkError return nil, linkError
} }
if err != nil { if err != nil {
err = transformStartContainerError(err) err = transformStartContainerError(err)
return fmt.Errorf("failed to start container %q: %v", containerID, err) return nil, fmt.Errorf("failed to start container %q: %v", r.ContainerId, err)
} }
return nil return &runtimeapi.StartContainerResponse{}, nil
} }
// StopContainer stops a running container with a grace period (i.e., timeout). // StopContainer stops a running container with a grace period (i.e., timeout).
func (ds *dockerService) StopContainer(containerID string, timeout int64) error { func (ds *dockerService) StopContainer(_ context.Context, r *runtimeapi.StopContainerRequest) (*runtimeapi.StopContainerResponse, error) {
return ds.client.StopContainer(containerID, time.Duration(timeout)*time.Second) err := ds.client.StopContainer(r.ContainerId, time.Duration(r.Timeout)*time.Second)
if err != nil {
return nil, err
}
return &runtimeapi.StopContainerResponse{}, nil
} }
// RemoveContainer removes the container. // RemoveContainer removes the container.
func (ds *dockerService) RemoveContainer(containerID string) error { func (ds *dockerService) RemoveContainer(_ context.Context, r *runtimeapi.RemoveContainerRequest) (*runtimeapi.RemoveContainerResponse, error) {
// Ideally, log lifecycle should be independent of container lifecycle. // Ideally, log lifecycle should be independent of container lifecycle.
// However, docker will remove container log after container is removed, // However, docker will remove container log after container is removed,
// we can't prevent that now, so we also clean up the symlink here. // we can't prevent that now, so we also clean up the symlink here.
err := ds.removeContainerLogSymlink(containerID) err := ds.removeContainerLogSymlink(r.ContainerId)
if err != nil { if err != nil {
return err return nil, err
} }
err = ds.client.RemoveContainer(containerID, dockertypes.ContainerRemoveOptions{RemoveVolumes: true, Force: true}) err = ds.client.RemoveContainer(r.ContainerId, dockertypes.ContainerRemoveOptions{RemoveVolumes: true, Force: true})
if err != nil { if err != nil {
return fmt.Errorf("failed to remove container %q: %v", containerID, err) return nil, fmt.Errorf("failed to remove container %q: %v", r.ContainerId, err)
} }
return nil return &runtimeapi.RemoveContainerResponse{}, nil
} }
func getContainerTimestamps(r *dockertypes.ContainerJSON) (time.Time, time.Time, time.Time, error) { func getContainerTimestamps(r *dockertypes.ContainerJSON) (time.Time, time.Time, time.Time, error) {
@ -290,7 +301,8 @@ func getContainerTimestamps(r *dockertypes.ContainerJSON) (time.Time, time.Time,
} }
// ContainerStatus inspects the docker container and returns the status. // ContainerStatus inspects the docker container and returns the status.
func (ds *dockerService) ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error) { func (ds *dockerService) ContainerStatus(_ context.Context, req *runtimeapi.ContainerStatusRequest) (*runtimeapi.ContainerStatusResponse, error) {
containerID := req.ContainerId
r, err := ds.client.InspectContainer(containerID) r, err := ds.client.InspectContainer(containerID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -373,7 +385,7 @@ func (ds *dockerService) ContainerStatus(containerID string) (*runtimeapi.Contai
if len(ir.RepoTags) > 0 { if len(ir.RepoTags) > 0 {
imageName = ir.RepoTags[0] imageName = ir.RepoTags[0]
} }
return &runtimeapi.ContainerStatus{ status := &runtimeapi.ContainerStatus{
Id: r.ID, Id: r.ID,
Metadata: metadata, Metadata: metadata,
Image: &runtimeapi.ImageSpec{Image: imageName}, Image: &runtimeapi.ImageSpec{Image: imageName},
@ -389,10 +401,12 @@ func (ds *dockerService) ContainerStatus(containerID string) (*runtimeapi.Contai
Labels: labels, Labels: labels,
Annotations: annotations, Annotations: annotations,
LogPath: r.Config.Labels[containerLogPathLabelKey], LogPath: r.Config.Labels[containerLogPathLabelKey],
}, nil }
return &runtimeapi.ContainerStatusResponse{Status: status}, nil
} }
func (ds *dockerService) UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error { func (ds *dockerService) UpdateContainerResources(_ context.Context, r *runtimeapi.UpdateContainerResourcesRequest) (*runtimeapi.UpdateContainerResourcesResponse, error) {
resources := r.Linux
updateConfig := dockercontainer.UpdateConfig{ updateConfig := dockercontainer.UpdateConfig{
Resources: dockercontainer.Resources{ Resources: dockercontainer.Resources{
CPUPeriod: resources.CpuPeriod, CPUPeriod: resources.CpuPeriod,
@ -404,9 +418,9 @@ func (ds *dockerService) UpdateContainerResources(containerID string, resources
}, },
} }
err := ds.client.UpdateContainerResources(containerID, updateConfig) err := ds.client.UpdateContainerResources(r.ContainerId, updateConfig)
if err != nil { if err != nil {
return fmt.Errorf("failed to update container %q: %v", containerID, err) return nil, fmt.Errorf("failed to update container %q: %v", r.ContainerId, err)
} }
return nil return &runtimeapi.UpdateContainerResourcesResponse{}, nil
} }

View File

@ -26,6 +26,7 @@ import (
dockertypes "github.com/docker/docker/api/types" dockertypes "github.com/docker/docker/api/types"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/net/context"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
@ -44,6 +45,10 @@ func makeContainerConfig(sConfig *runtimeapi.PodSandboxConfig, name, image strin
} }
} }
func getTestCTX() context.Context {
return context.Background()
}
// TestListContainers creates several containers and then list them to check // TestListContainers creates several containers and then list them to check
// whether the correct metadatas, states, and labels are returned. // whether the correct metadatas, states, and labels are returned.
func TestListContainers(t *testing.T) { func TestListContainers(t *testing.T) {
@ -70,10 +75,12 @@ func TestListContainers(t *testing.T) {
for i := range configs { for i := range configs {
// We don't care about the sandbox id; pass a bogus one. // We don't care about the sandbox id; pass a bogus one.
sandboxID := fmt.Sprintf("sandboxid%d", i) sandboxID := fmt.Sprintf("sandboxid%d", i)
id, err := ds.CreateContainer(sandboxID, configs[i], sConfigs[i]) req := &runtimeapi.CreateContainerRequest{PodSandboxId: sandboxID, Config: configs[i], SandboxConfig: sConfigs[i]}
assert.NoError(t, err) createResp, err := ds.CreateContainer(getTestCTX(), req)
err = ds.StartContainer(id) require.NoError(t, err)
assert.NoError(t, err) id := createResp.ContainerId
_, err = ds.StartContainer(getTestCTX(), &runtimeapi.StartContainerRequest{ContainerId: id})
require.NoError(t, err)
imageRef := "" // FakeDockerClient doesn't populate ImageRef yet. imageRef := "" // FakeDockerClient doesn't populate ImageRef yet.
// Prepend to the expected list because ListContainers returns // Prepend to the expected list because ListContainers returns
@ -90,10 +97,10 @@ func TestListContainers(t *testing.T) {
Annotations: configs[i].Annotations, Annotations: configs[i].Annotations,
}}, expected...) }}, expected...)
} }
containers, err := ds.ListContainers(nil) listResp, err := ds.ListContainers(getTestCTX(), &runtimeapi.ListContainersRequest{})
assert.NoError(t, err) require.NoError(t, err)
assert.Len(t, containers, len(expected)) assert.Len(t, listResp.Containers, len(expected))
assert.Equal(t, expected, containers) assert.Equal(t, expected, listResp.Containers)
} }
// TestContainerStatus tests the basic lifecycle operations and verify that // TestContainerStatus tests the basic lifecycle operations and verify that
@ -137,31 +144,36 @@ func TestContainerStatus(t *testing.T) {
fClock.SetTime(time.Now().Add(-1 * time.Hour)) fClock.SetTime(time.Now().Add(-1 * time.Hour))
expected.CreatedAt = fClock.Now().UnixNano() expected.CreatedAt = fClock.Now().UnixNano()
const sandboxId = "sandboxid" const sandboxId = "sandboxid"
id, err := ds.CreateContainer(sandboxId, config, sConfig)
assert.NoError(t, err) req := &runtimeapi.CreateContainerRequest{PodSandboxId: sandboxId, Config: config, SandboxConfig: sConfig}
createResp, err := ds.CreateContainer(getTestCTX(), req)
require.NoError(t, err)
id := createResp.ContainerId
// Check internal labels // Check internal labels
c, err := fDocker.InspectContainer(id) c, err := fDocker.InspectContainer(id)
assert.NoError(t, err) require.NoError(t, err)
assert.Equal(t, c.Config.Labels[containerTypeLabelKey], containerTypeLabelContainer) assert.Equal(t, c.Config.Labels[containerTypeLabelKey], containerTypeLabelContainer)
assert.Equal(t, c.Config.Labels[sandboxIDLabelKey], sandboxId) assert.Equal(t, c.Config.Labels[sandboxIDLabelKey], sandboxId)
// Set the id manually since we don't know the id until it's created. // Set the id manually since we don't know the id until it's created.
expected.Id = id expected.Id = id
assert.NoError(t, err) assert.NoError(t, err)
status, err := ds.ContainerStatus(id) resp, err := ds.ContainerStatus(getTestCTX(), &runtimeapi.ContainerStatusRequest{ContainerId: id})
assert.NoError(t, err) require.NoError(t, err)
assert.Equal(t, expected, status) assert.Equal(t, expected, resp.Status)
// Advance the clock and start the container. // Advance the clock and start the container.
fClock.SetTime(time.Now()) fClock.SetTime(time.Now())
expected.StartedAt = fClock.Now().UnixNano() expected.StartedAt = fClock.Now().UnixNano()
expected.State = runtimeapi.ContainerState_CONTAINER_RUNNING expected.State = runtimeapi.ContainerState_CONTAINER_RUNNING
err = ds.StartContainer(id) _, err = ds.StartContainer(getTestCTX(), &runtimeapi.StartContainerRequest{ContainerId: id})
assert.NoError(t, err) require.NoError(t, err)
status, err = ds.ContainerStatus(id)
assert.Equal(t, expected, status) resp, err = ds.ContainerStatus(getTestCTX(), &runtimeapi.ContainerStatusRequest{ContainerId: id})
require.NoError(t, err)
assert.Equal(t, expected, resp.Status)
// Advance the clock and stop the container. // Advance the clock and stop the container.
fClock.SetTime(time.Now().Add(1 * time.Hour)) fClock.SetTime(time.Now().Add(1 * time.Hour))
@ -169,16 +181,17 @@ func TestContainerStatus(t *testing.T) {
expected.State = runtimeapi.ContainerState_CONTAINER_EXITED expected.State = runtimeapi.ContainerState_CONTAINER_EXITED
expected.Reason = "Completed" expected.Reason = "Completed"
err = ds.StopContainer(id, 0) _, err = ds.StopContainer(getTestCTX(), &runtimeapi.StopContainerRequest{ContainerId: id, Timeout: int64(0)})
assert.NoError(t, err) assert.NoError(t, err)
status, err = ds.ContainerStatus(id) resp, err = ds.ContainerStatus(getTestCTX(), &runtimeapi.ContainerStatusRequest{ContainerId: id})
assert.Equal(t, expected, status) require.NoError(t, err)
assert.Equal(t, expected, resp.Status)
// Remove the container. // Remove the container.
err = ds.RemoveContainer(id) _, err = ds.RemoveContainer(getTestCTX(), &runtimeapi.RemoveContainerRequest{ContainerId: id})
assert.NoError(t, err) require.NoError(t, err)
status, err = ds.ContainerStatus(id) resp, err = ds.ContainerStatus(getTestCTX(), &runtimeapi.ContainerStatusRequest{ContainerId: id})
assert.Error(t, err, fmt.Sprintf("status of container: %+v", status)) assert.Error(t, err, fmt.Sprintf("status of container: %+v", resp))
} }
// TestContainerLogPath tests the container log creation logic. // TestContainerLogPath tests the container log creation logic.
@ -193,7 +206,10 @@ func TestContainerLogPath(t *testing.T) {
config.LogPath = containerLogPath config.LogPath = containerLogPath
const sandboxId = "sandboxid" const sandboxId = "sandboxid"
id, err := ds.CreateContainer(sandboxId, config, sConfig) req := &runtimeapi.CreateContainerRequest{PodSandboxId: sandboxId, Config: config, SandboxConfig: sConfig}
createResp, err := ds.CreateContainer(getTestCTX(), req)
require.NoError(t, err)
id := createResp.ContainerId
// Check internal container log label // Check internal container log label
c, err := fDocker.InspectContainer(id) c, err := fDocker.InspectContainer(id)
@ -211,16 +227,16 @@ func TestContainerLogPath(t *testing.T) {
assert.Equal(t, kubeletContainerLogPath, newname) assert.Equal(t, kubeletContainerLogPath, newname)
return nil return nil
} }
err = ds.StartContainer(id) _, err = ds.StartContainer(getTestCTX(), &runtimeapi.StartContainerRequest{ContainerId: id})
assert.NoError(t, err) require.NoError(t, err)
err = ds.StopContainer(id, 0) _, err = ds.StopContainer(getTestCTX(), &runtimeapi.StopContainerRequest{ContainerId: id, Timeout: int64(0)})
assert.NoError(t, err) require.NoError(t, err)
// Verify container log symlink deletion // Verify container log symlink deletion
// symlink is also tentatively deleted at startup // symlink is also tentatively deleted at startup
err = ds.RemoveContainer(id) _, err = ds.RemoveContainer(getTestCTX(), &runtimeapi.RemoveContainerRequest{ContainerId: id})
assert.NoError(t, err) require.NoError(t, err)
assert.Equal(t, []string{kubeletContainerLogPath, kubeletContainerLogPath}, fakeOS.Removes) assert.Equal(t, []string{kubeletContainerLogPath, kubeletContainerLogPath}, fakeOS.Removes)
} }
@ -280,11 +296,13 @@ func TestContainerCreationConflict(t *testing.T) {
if test.removeError != nil { if test.removeError != nil {
fDocker.InjectError("remove", test.removeError) fDocker.InjectError("remove", test.removeError)
} }
id, err := ds.CreateContainer(sandboxId, config, sConfig)
req := &runtimeapi.CreateContainerRequest{PodSandboxId: sandboxId, Config: config, SandboxConfig: sConfig}
createResp, err := ds.CreateContainer(getTestCTX(), req)
require.Equal(t, test.expectError, err) require.Equal(t, test.expectError, err)
assert.NoError(t, fDocker.AssertCalls(test.expectCalls)) assert.NoError(t, fDocker.AssertCalls(test.expectCalls))
if err == nil { if err == nil {
c, err := fDocker.InspectContainer(id) c, err := fDocker.InspectContainer(createResp.ContainerId)
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, strings.Split(c.Name, nameDelimiter), test.expectFields) assert.Len(t, strings.Split(c.Name, nameDelimiter), test.expectFields)
} }

View File

@ -23,6 +23,7 @@ import (
dockertypes "github.com/docker/docker/api/types" dockertypes "github.com/docker/docker/api/types"
dockerfilters "github.com/docker/docker/api/types/filters" dockerfilters "github.com/docker/docker/api/types/filters"
"github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/jsonmessage"
"golang.org/x/net/context"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
@ -31,7 +32,8 @@ import (
// This file implements methods in ImageManagerService. // This file implements methods in ImageManagerService.
// ListImages lists existing images. // ListImages lists existing images.
func (ds *dockerService) ListImages(filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error) { func (ds *dockerService) ListImages(_ context.Context, r *runtimeapi.ListImagesRequest) (*runtimeapi.ListImagesResponse, error) {
filter := r.GetFilter()
opts := dockertypes.ImageListOptions{} opts := dockertypes.ImageListOptions{}
if filter != nil { if filter != nil {
if filter.GetImage().GetImage() != "" { if filter.GetImage().GetImage() != "" {
@ -54,24 +56,35 @@ func (ds *dockerService) ListImages(filter *runtimeapi.ImageFilter) ([]*runtimea
} }
result = append(result, apiImage) result = append(result, apiImage)
} }
return result, nil return &runtimeapi.ListImagesResponse{Images: result}, nil
} }
// ImageStatus returns the status of the image, returns nil if the image doesn't present. // ImageStatus returns the status of the image, returns nil if the image doesn't present.
func (ds *dockerService) ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi.Image, error) { func (ds *dockerService) ImageStatus(_ context.Context, r *runtimeapi.ImageStatusRequest) (*runtimeapi.ImageStatusResponse, error) {
image := r.GetImage()
imageInspect, err := ds.client.InspectImageByRef(image.Image) imageInspect, err := ds.client.InspectImageByRef(image.Image)
if err != nil { if err != nil {
if libdocker.IsImageNotFoundError(err) { if libdocker.IsImageNotFoundError(err) {
return nil, nil return &runtimeapi.ImageStatusResponse{}, nil
} }
return nil, err return nil, err
} }
return imageInspectToRuntimeAPIImage(imageInspect)
imageStatus, err := imageInspectToRuntimeAPIImage(imageInspect)
if err != nil {
return nil, err
}
return &runtimeapi.ImageStatusResponse{Image: imageStatus}, nil
} }
// PullImage pulls an image with authentication config. // PullImage pulls an image with authentication config.
func (ds *dockerService) PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig) (string, error) { func (ds *dockerService) PullImage(_ context.Context, r *runtimeapi.PullImageRequest) (*runtimeapi.PullImageResponse, error) {
image := r.GetImage()
auth := r.GetAuth()
authConfig := dockertypes.AuthConfig{} authConfig := dockertypes.AuthConfig{}
if auth != nil { if auth != nil {
authConfig.Username = auth.Username authConfig.Username = auth.Username
authConfig.Password = auth.Password authConfig.Password = auth.Password
@ -84,14 +97,20 @@ func (ds *dockerService) PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi
dockertypes.ImagePullOptions{}, dockertypes.ImagePullOptions{},
) )
if err != nil { if err != nil {
return "", filterHTTPError(err, image.Image) return nil, filterHTTPError(err, image.Image)
} }
return getImageRef(ds.client, image.Image) imageRef, err := getImageRef(ds.client, image.Image)
if err != nil {
return nil, err
}
return &runtimeapi.PullImageResponse{ImageRef: imageRef}, nil
} }
// RemoveImage removes the image. // RemoveImage removes the image.
func (ds *dockerService) RemoveImage(image *runtimeapi.ImageSpec) error { func (ds *dockerService) RemoveImage(_ context.Context, r *runtimeapi.RemoveImageRequest) (*runtimeapi.RemoveImageResponse, error) {
image := r.GetImage()
// If the image has multiple tags, we need to remove all the tags // If the image has multiple tags, we need to remove all the tags
// TODO: We assume image.Image is image ID here, which is true in the current implementation // TODO: We assume image.Image is image ID here, which is true in the current implementation
// of kubelet, but we should still clarify this in CRI. // of kubelet, but we should still clarify this in CRI.
@ -99,22 +118,22 @@ func (ds *dockerService) RemoveImage(image *runtimeapi.ImageSpec) error {
if err == nil && imageInspect != nil && len(imageInspect.RepoTags) > 1 { if err == nil && imageInspect != nil && len(imageInspect.RepoTags) > 1 {
for _, tag := range imageInspect.RepoTags { for _, tag := range imageInspect.RepoTags {
if _, err := ds.client.RemoveImage(tag, dockertypes.ImageRemoveOptions{PruneChildren: true}); err != nil && !libdocker.IsImageNotFoundError(err) { if _, err := ds.client.RemoveImage(tag, dockertypes.ImageRemoveOptions{PruneChildren: true}); err != nil && !libdocker.IsImageNotFoundError(err) {
return err return nil, err
} }
} }
return nil return &runtimeapi.RemoveImageResponse{}, nil
} }
// dockerclient.InspectImageByID doesn't work with digest and repoTags, // dockerclient.InspectImageByID doesn't work with digest and repoTags,
// it is safe to continue removing it since there is another check below. // it is safe to continue removing it since there is another check below.
if err != nil && !libdocker.IsImageNotFoundError(err) { if err != nil && !libdocker.IsImageNotFoundError(err) {
return err return nil, err
} }
_, err = ds.client.RemoveImage(image.Image, dockertypes.ImageRemoveOptions{PruneChildren: true}) _, err = ds.client.RemoveImage(image.Image, dockertypes.ImageRemoveOptions{PruneChildren: true})
if err != nil && !libdocker.IsImageNotFoundError(err) { if err != nil && !libdocker.IsImageNotFoundError(err) {
return err return nil, err
} }
return nil return &runtimeapi.RemoveImageResponse{}, nil
} }
// getImageRef returns the image digest if exists, or else returns the image ID. // getImageRef returns the image digest if exists, or else returns the image ID.

View File

@ -21,10 +21,12 @@ package dockershim
import ( import (
"fmt" "fmt"
"golang.org/x/net/context"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
) )
// ImageFsInfo returns information of the filesystem that is used to store images. // ImageFsInfo returns information of the filesystem that is used to store images.
func (ds *dockerService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) { func (ds *dockerService) ImageFsInfo(_ context.Context, r *runtimeapi.ImageFsInfoRequest) (*runtimeapi.ImageFsInfoResponse, error) {
return nil, fmt.Errorf("not implemented") return nil, fmt.Errorf("not implemented")
} }

View File

@ -23,6 +23,7 @@ import (
dockertypes "github.com/docker/docker/api/types" dockertypes "github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/jsonmessage"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
@ -32,7 +33,7 @@ func TestRemoveImage(t *testing.T) {
ds, fakeDocker, _ := newTestDockerService() ds, fakeDocker, _ := newTestDockerService()
id := "1111" id := "1111"
fakeDocker.InjectImageInspects([]dockertypes.ImageInspect{{ID: id, RepoTags: []string{"foo"}}}) fakeDocker.InjectImageInspects([]dockertypes.ImageInspect{{ID: id, RepoTags: []string{"foo"}}})
ds.RemoveImage(&runtimeapi.ImageSpec{Image: id}) ds.RemoveImage(getTestCTX(), &runtimeapi.RemoveImageRequest{Image: &runtimeapi.ImageSpec{Image: id}})
fakeDocker.AssertCallDetails(libdocker.NewCalledDetail("inspect_image", nil), fakeDocker.AssertCallDetails(libdocker.NewCalledDetail("inspect_image", nil),
libdocker.NewCalledDetail("remove_image", []interface{}{id, dockertypes.ImageRemoveOptions{PruneChildren: true}})) libdocker.NewCalledDetail("remove_image", []interface{}{id, dockertypes.ImageRemoveOptions{PruneChildren: true}}))
} }
@ -41,7 +42,7 @@ func TestRemoveImageWithMultipleTags(t *testing.T) {
ds, fakeDocker, _ := newTestDockerService() ds, fakeDocker, _ := newTestDockerService()
id := "1111" id := "1111"
fakeDocker.InjectImageInspects([]dockertypes.ImageInspect{{ID: id, RepoTags: []string{"foo", "bar"}}}) fakeDocker.InjectImageInspects([]dockertypes.ImageInspect{{ID: id, RepoTags: []string{"foo", "bar"}}})
ds.RemoveImage(&runtimeapi.ImageSpec{Image: id}) ds.RemoveImage(getTestCTX(), &runtimeapi.RemoveImageRequest{Image: &runtimeapi.ImageSpec{Image: id}})
fakeDocker.AssertCallDetails(libdocker.NewCalledDetail("inspect_image", nil), fakeDocker.AssertCallDetails(libdocker.NewCalledDetail("inspect_image", nil),
libdocker.NewCalledDetail("remove_image", []interface{}{"foo", dockertypes.ImageRemoveOptions{PruneChildren: true}}), libdocker.NewCalledDetail("remove_image", []interface{}{"foo", dockertypes.ImageRemoveOptions{PruneChildren: true}}),
libdocker.NewCalledDetail("remove_image", []interface{}{"bar", dockertypes.ImageRemoveOptions{PruneChildren: true}})) libdocker.NewCalledDetail("remove_image", []interface{}{"bar", dockertypes.ImageRemoveOptions{PruneChildren: true}}))
@ -67,8 +68,8 @@ func TestPullWithJSONError(t *testing.T) {
} }
for key, test := range tests { for key, test := range tests {
fakeDocker.InjectError("pull", test.err) fakeDocker.InjectError("pull", test.err)
_, err := ds.PullImage(test.image, &runtimeapi.AuthConfig{}) _, err := ds.PullImage(getTestCTX(), &runtimeapi.PullImageRequest{Image: test.image, Auth: &runtimeapi.AuthConfig{}})
assert.Error(t, err, fmt.Sprintf("TestCase [%s]", key)) require.Error(t, err, fmt.Sprintf("TestCase [%s]", key))
assert.Contains(t, err.Error(), test.expectedError) assert.Contains(t, err.Error(), test.expectedError)
} }
} }

View File

@ -21,10 +21,12 @@ package dockershim
import ( import (
"fmt" "fmt"
"golang.org/x/net/context"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
) )
// ImageFsInfo returns information of the filesystem that is used to store images. // ImageFsInfo returns information of the filesystem that is used to store images.
func (ds *dockerService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) { func (ds *dockerService) ImageFsInfo(_ context.Context, r *runtimeapi.ImageFsInfoRequest) (*runtimeapi.ImageFsInfoResponse, error) {
return nil, fmt.Errorf("not implemented") return nil, fmt.Errorf("not implemented")
} }

View File

@ -21,11 +21,13 @@ package dockershim
import ( import (
"time" "time"
"golang.org/x/net/context"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
) )
// ImageFsInfo returns information of the filesystem that is used to store images. // ImageFsInfo returns information of the filesystem that is used to store images.
func (ds *dockerService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) { func (ds *dockerService) ImageFsInfo(_ context.Context, _ *runtimeapi.ImageFsInfoRequest) (*runtimeapi.ImageFsInfoResponse, error) {
// For Windows Stats to work correctly, a file system must be provided. For now, provide a fake filesystem. // For Windows Stats to work correctly, a file system must be provided. For now, provide a fake filesystem.
filesystems := []*runtimeapi.FilesystemUsage{ filesystems := []*runtimeapi.FilesystemUsage{
{ {
@ -35,5 +37,5 @@ func (ds *dockerService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) {
}, },
} }
return filesystems, nil return &runtimeapi.ImageFsInfoResponse{ImageFilesystems: filesystems}, nil
} }

View File

@ -0,0 +1,123 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockershim
import (
"fmt"
"io"
"strconv"
"time"
"github.com/armon/circbuf"
dockertypes "github.com/docker/docker/api/types"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubetypes "k8s.io/apimachinery/pkg/types"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
)
// DockerLegacyService interface embeds some legacy methods for backward compatibility.
// This file/interface will be removed in the near future. Do not modify or add
// more functions.
type DockerLegacyService interface {
// GetContainerLogs gets logs for a specific container.
GetContainerLogs(*v1.Pod, kubecontainer.ContainerID, *v1.PodLogOptions, io.Writer, io.Writer) error
// IsCRISupportedLogDriver checks whether the logging driver used by docker is
// suppoted by native CRI integration.
// TODO(resouer): remove this when deprecating unsupported log driver
IsCRISupportedLogDriver() (bool, error)
kuberuntime.LegacyLogProvider
}
// GetContainerLogs get container logs directly from docker daemon.
func (d *dockerService) GetContainerLogs(pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
container, err := d.client.InspectContainer(containerID.ID)
if err != nil {
return err
}
var since int64
if logOptions.SinceSeconds != nil {
t := metav1.Now().Add(-time.Duration(*logOptions.SinceSeconds) * time.Second)
since = t.Unix()
}
if logOptions.SinceTime != nil {
since = logOptions.SinceTime.Unix()
}
opts := dockertypes.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Since: strconv.FormatInt(since, 10),
Timestamps: logOptions.Timestamps,
Follow: logOptions.Follow,
}
if logOptions.TailLines != nil {
opts.Tail = strconv.FormatInt(*logOptions.TailLines, 10)
}
sopts := libdocker.StreamOptions{
OutputStream: stdout,
ErrorStream: stderr,
RawTerminal: container.Config.Tty,
}
return d.client.Logs(containerID.ID, opts, sopts)
}
// GetContainerLogTail attempts to read up to MaxContainerTerminationMessageLogLength
// from the end of the log when docker is configured with a log driver other than json-log.
// It reads up to MaxContainerTerminationMessageLogLines lines.
func (d *dockerService) GetContainerLogTail(uid kubetypes.UID, name, namespace string, containerId kubecontainer.ContainerID) (string, error) {
value := int64(kubecontainer.MaxContainerTerminationMessageLogLines)
buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength)
// Although this is not a full spec pod, dockerLegacyService.GetContainerLogs() currently completely ignores its pod param
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: uid,
Name: name,
Namespace: namespace,
},
}
err := d.GetContainerLogs(pod, containerId, &v1.PodLogOptions{TailLines: &value}, buf, buf)
if err != nil {
return "", err
}
return buf.String(), nil
}
// criSupportedLogDrivers are log drivers supported by native CRI integration.
var criSupportedLogDrivers = []string{"json-file"}
// IsCRISupportedLogDriver checks whether the logging driver used by docker is
// suppoted by native CRI integration.
func (d *dockerService) IsCRISupportedLogDriver() (bool, error) {
info, err := d.client.Info()
if err != nil {
return false, fmt.Errorf("failed to get docker info: %v", err)
}
for _, driver := range criSupportedLogDrivers {
if info.LoggingDriver == driver {
return true, nil
}
}
return false, nil
}

View File

@ -26,6 +26,7 @@ import (
dockercontainer "github.com/docker/docker/api/types/container" dockercontainer "github.com/docker/docker/api/types/container"
dockerfilters "github.com/docker/docker/api/types/filters" dockerfilters "github.com/docker/docker/api/types/filters"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/context"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
@ -75,7 +76,9 @@ func (ds *dockerService) clearNetworkReady(podSandboxID string) {
// For docker, PodSandbox is implemented by a container holding the network // For docker, PodSandbox is implemented by a container holding the network
// namespace for the pod. // namespace for the pod.
// Note: docker doesn't use LogDirectory (yet). // Note: docker doesn't use LogDirectory (yet).
func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id string, err error) { func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
config := r.GetConfig()
// Step 1: Pull the image for the sandbox. // Step 1: Pull the image for the sandbox.
image := defaultSandboxImage image := defaultSandboxImage
podSandboxImage := ds.podSandboxImage podSandboxImage := ds.podSandboxImage
@ -87,13 +90,13 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id
// see: http://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-repository // see: http://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-repository
// Only pull sandbox image when it's not present - v1.PullIfNotPresent. // Only pull sandbox image when it's not present - v1.PullIfNotPresent.
if err := ensureSandboxImageExists(ds.client, image); err != nil { if err := ensureSandboxImageExists(ds.client, image); err != nil {
return "", err return nil, err
} }
// Step 2: Create the sandbox container. // Step 2: Create the sandbox container.
createConfig, err := ds.makeSandboxDockerConfig(config, image) createConfig, err := ds.makeSandboxDockerConfig(config, image)
if err != nil { if err != nil {
return "", fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err) return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err)
} }
createResp, err := ds.client.CreateContainer(*createConfig) createResp, err := ds.client.CreateContainer(*createConfig)
if err != nil { if err != nil {
@ -101,8 +104,9 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id
} }
if err != nil || createResp == nil { if err != nil || createResp == nil {
return "", fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err) return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err)
} }
resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID}
ds.setNetworkReady(createResp.ID, false) ds.setNetworkReady(createResp.ID, false)
defer func(e *error) { defer func(e *error) {
@ -115,7 +119,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id
// Step 3: Create Sandbox Checkpoint. // Step 3: Create Sandbox Checkpoint.
if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
return createResp.ID, err return nil, err
} }
// Step 4: Start the sandbox container. // Step 4: Start the sandbox container.
@ -123,7 +127,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id
// startContainer failed. // startContainer failed.
err = ds.client.StartContainer(createResp.ID) err = ds.client.StartContainer(createResp.ID)
if err != nil { if err != nil {
return createResp.ID, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err) return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err)
} }
// Rewrite resolv.conf file generated by docker. // Rewrite resolv.conf file generated by docker.
@ -135,17 +139,17 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id
if dnsConfig := config.GetDnsConfig(); dnsConfig != nil { if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
containerInfo, err := ds.client.InspectContainer(createResp.ID) containerInfo, err := ds.client.InspectContainer(createResp.ID)
if err != nil { if err != nil {
return createResp.ID, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err) return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err)
} }
if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil { if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil {
return createResp.ID, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err) return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err)
} }
} }
// Do not invoke network plugins if in hostNetwork mode. // Do not invoke network plugins if in hostNetwork mode.
if nsOptions := config.GetLinux().GetSecurityContext().GetNamespaceOptions(); nsOptions != nil && nsOptions.HostNetwork { if nsOptions := config.GetLinux().GetSecurityContext().GetNamespaceOptions(); nsOptions != nil && nsOptions.HostNetwork {
return createResp.ID, nil return resp, nil
} }
// Step 5: Setup networking for the sandbox. // Step 5: Setup networking for the sandbox.
@ -163,7 +167,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id
glog.Warningf("Failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err) glog.Warningf("Failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err)
} }
} }
return createResp.ID, err return resp, err
} }
// StopPodSandbox stops the sandbox. If there are any running containers in the // StopPodSandbox stops the sandbox. If there are any running containers in the
@ -171,13 +175,17 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id
// TODO: This function blocks sandbox teardown on networking teardown. Is it // TODO: This function blocks sandbox teardown on networking teardown. Is it
// better to cut our losses assuming an out of band GC routine will cleanup // better to cut our losses assuming an out of band GC routine will cleanup
// after us? // after us?
func (ds *dockerService) StopPodSandbox(podSandboxID string) error { func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) {
var namespace, name string var namespace, name string
var hostNetwork bool var hostNetwork bool
var checkpointErr, statusErr error var checkpointErr, statusErr error
podSandboxID := r.PodSandboxId
resp := &runtimeapi.StopPodSandboxResponse{}
// Try to retrieve sandbox information from docker daemon or sandbox checkpoint // Try to retrieve sandbox information from docker daemon or sandbox checkpoint
status, statusErr := ds.PodSandboxStatus(podSandboxID) statusResp, statusErr := ds.PodSandboxStatus(ctx, &runtimeapi.PodSandboxStatusRequest{PodSandboxId: podSandboxID})
status := statusResp.GetStatus()
if statusErr == nil { if statusErr == nil {
nsOpts := status.GetLinux().GetNamespaces().GetOptions() nsOpts := status.GetLinux().GetNamespaces().GetOptions()
hostNetwork = nsOpts != nil && nsOpts.HostNetwork hostNetwork = nsOpts != nil && nsOpts.HostNetwork
@ -196,7 +204,7 @@ func (ds *dockerService) StopPodSandbox(podSandboxID string) error {
glog.Warningf("Both sandbox container and checkpoint for id %q could not be found. "+ glog.Warningf("Both sandbox container and checkpoint for id %q could not be found. "+
"Proceed without further sandbox information.", podSandboxID) "Proceed without further sandbox information.", podSandboxID)
} else { } else {
return utilerrors.NewAggregate([]error{ return nil, utilerrors.NewAggregate([]error{
fmt.Errorf("failed to get checkpoint for sandbox %q: %v", podSandboxID, checkpointErr), fmt.Errorf("failed to get checkpoint for sandbox %q: %v", podSandboxID, checkpointErr),
fmt.Errorf("failed to get sandbox status: %v", statusErr)}) fmt.Errorf("failed to get sandbox status: %v", statusErr)})
} }
@ -237,14 +245,21 @@ func (ds *dockerService) StopPodSandbox(podSandboxID string) error {
ds.checkpointHandler.RemoveCheckpoint(podSandboxID) ds.checkpointHandler.RemoveCheckpoint(podSandboxID)
} }
} }
return utilerrors.NewAggregate(errList)
if len(errList) == 0 {
return resp, nil
}
// TODO: Stop all running containers in the sandbox. // TODO: Stop all running containers in the sandbox.
return nil, utilerrors.NewAggregate(errList)
} }
// RemovePodSandbox removes the sandbox. If there are running containers in the // RemovePodSandbox removes the sandbox. If there are running containers in the
// sandbox, they should be forcibly removed. // sandbox, they should be forcibly removed.
func (ds *dockerService) RemovePodSandbox(podSandboxID string) error { func (ds *dockerService) RemovePodSandbox(ctx context.Context, r *runtimeapi.RemovePodSandboxRequest) (*runtimeapi.RemovePodSandboxResponse, error) {
podSandboxID := r.PodSandboxId
var errs []error var errs []error
opts := dockertypes.ContainerListOptions{All: true} opts := dockertypes.ContainerListOptions{All: true}
opts.Filters = dockerfilters.NewArgs() opts.Filters = dockerfilters.NewArgs()
@ -258,7 +273,7 @@ func (ds *dockerService) RemovePodSandbox(podSandboxID string) error {
// Remove all containers in the sandbox. // Remove all containers in the sandbox.
for i := range containers { for i := range containers {
if err := ds.RemoveContainer(containers[i].ID); err != nil && !libdocker.IsContainerNotFoundError(err) { if _, err := ds.RemoveContainer(ctx, &runtimeapi.RemoveContainerRequest{ContainerId: containers[i].ID}); err != nil && !libdocker.IsContainerNotFoundError(err) {
errs = append(errs, err) errs = append(errs, err)
} }
} }
@ -277,7 +292,10 @@ func (ds *dockerService) RemovePodSandbox(podSandboxID string) error {
if err := ds.checkpointHandler.RemoveCheckpoint(podSandboxID); err != nil { if err := ds.checkpointHandler.RemoveCheckpoint(podSandboxID); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
return utilerrors.NewAggregate(errs) if len(errs) == 0 {
return &runtimeapi.RemovePodSandboxResponse{}, nil
}
return nil, utilerrors.NewAggregate(errs)
} }
// getIPFromPlugin interrogates the network plugin for an IP. // getIPFromPlugin interrogates the network plugin for an IP.
@ -342,7 +360,9 @@ func (ds *dockerService) getIP(podSandboxID string, sandbox *dockertypes.Contain
} }
// PodSandboxStatus returns the status of the PodSandbox. // PodSandboxStatus returns the status of the PodSandbox.
func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeapi.PodSandboxStatus, error) { func (ds *dockerService) PodSandboxStatus(ctx context.Context, req *runtimeapi.PodSandboxStatusRequest) (*runtimeapi.PodSandboxStatusResponse, error) {
podSandboxID := req.PodSandboxId
// Inspect the container. // Inspect the container.
r, err := ds.client.InspectContainer(podSandboxID) r, err := ds.client.InspectContainer(podSandboxID)
if err != nil { if err != nil {
@ -375,7 +395,7 @@ func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeapi.PodS
return nil, err return nil, err
} }
labels, annotations := extractLabels(r.Config.Labels) labels, annotations := extractLabels(r.Config.Labels)
return &runtimeapi.PodSandboxStatus{ status := &runtimeapi.PodSandboxStatus{
Id: r.ID, Id: r.ID,
State: state, State: state,
CreatedAt: ct, CreatedAt: ct,
@ -394,11 +414,14 @@ func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeapi.PodS
}, },
}, },
}, },
}, nil }
return &runtimeapi.PodSandboxStatusResponse{Status: status}, nil
} }
// ListPodSandbox returns a list of Sandbox. // ListPodSandbox returns a list of Sandbox.
func (ds *dockerService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) { func (ds *dockerService) ListPodSandbox(_ context.Context, r *runtimeapi.ListPodSandboxRequest) (*runtimeapi.ListPodSandboxResponse, error) {
filter := r.GetFilter()
// By default, list all containers whether they are running or not. // By default, list all containers whether they are running or not.
opts := dockertypes.ContainerListOptions{All: true} opts := dockertypes.ContainerListOptions{All: true}
filterOutReadySandboxes := false filterOutReadySandboxes := false
@ -482,7 +505,7 @@ func (ds *dockerService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]
result = append(result, checkpointToRuntimeAPISandbox(id, checkpoint)) result = append(result, checkpointToRuntimeAPISandbox(id, checkpoint))
} }
return result, nil return &runtimeapi.ListPodSandboxResponse{Items: result}, nil
} }
// applySandboxLinuxOptions applies LinuxPodSandboxConfig to dockercontainer.HostConfig and dockercontainer.ContainerCreateConfig. // applySandboxLinuxOptions applies LinuxPodSandboxConfig to dockercontainer.HostConfig and dockercontainer.ContainerCreateConfig.

View File

@ -24,6 +24,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -69,23 +70,23 @@ func TestListSandboxes(t *testing.T) {
state := runtimeapi.PodSandboxState_SANDBOX_READY state := runtimeapi.PodSandboxState_SANDBOX_READY
var createdAt int64 = fakeClock.Now().UnixNano() var createdAt int64 = fakeClock.Now().UnixNano()
for i := range configs { for i := range configs {
id, err := ds.RunPodSandbox(configs[i]) runResp, err := ds.RunPodSandbox(getTestCTX(), &runtimeapi.RunPodSandboxRequest{Config: configs[i]})
assert.NoError(t, err) require.NoError(t, err)
// Prepend to the expected list because ListPodSandbox returns // Prepend to the expected list because ListPodSandbox returns
// the most recent sandbox first. // the most recent sandbox first.
expected = append([]*runtimeapi.PodSandbox{{ expected = append([]*runtimeapi.PodSandbox{{
Metadata: configs[i].Metadata, Metadata: configs[i].Metadata,
Id: id, Id: runResp.PodSandboxId,
State: state, State: state,
CreatedAt: createdAt, CreatedAt: createdAt,
Labels: configs[i].Labels, Labels: configs[i].Labels,
Annotations: configs[i].Annotations, Annotations: configs[i].Annotations,
}}, expected...) }}, expected...)
} }
sandboxes, err := ds.ListPodSandbox(nil) listResp, err := ds.ListPodSandbox(getTestCTX(), &runtimeapi.ListPodSandboxRequest{})
assert.NoError(t, err) require.NoError(t, err)
assert.Len(t, sandboxes, len(expected)) assert.Len(t, listResp.Items, len(expected))
assert.Equal(t, expected, sandboxes) assert.Equal(t, expected, listResp.Items)
} }
// TestSandboxStatus tests the basic lifecycle operations and verify that // TestSandboxStatus tests the basic lifecycle operations and verify that
@ -116,7 +117,9 @@ func TestSandboxStatus(t *testing.T) {
// Create the sandbox. // Create the sandbox.
fClock.SetTime(time.Now()) fClock.SetTime(time.Now())
expected.CreatedAt = fClock.Now().UnixNano() expected.CreatedAt = fClock.Now().UnixNano()
id, err := ds.RunPodSandbox(config) runResp, err := ds.RunPodSandbox(getTestCTX(), &runtimeapi.RunPodSandboxRequest{Config: config})
require.NoError(t, err)
id := runResp.PodSandboxId
// Check internal labels // Check internal labels
c, err := fDocker.InspectContainer(id) c, err := fDocker.InspectContainer(id)
@ -125,24 +128,25 @@ func TestSandboxStatus(t *testing.T) {
assert.Equal(t, c.Config.Labels[types.KubernetesContainerNameLabel], sandboxContainerName) assert.Equal(t, c.Config.Labels[types.KubernetesContainerNameLabel], sandboxContainerName)
expected.Id = id // ID is only known after the creation. expected.Id = id // ID is only known after the creation.
status, err := ds.PodSandboxStatus(id) statusResp, err := ds.PodSandboxStatus(getTestCTX(), &runtimeapi.PodSandboxStatusRequest{PodSandboxId: id})
assert.NoError(t, err) require.NoError(t, err)
assert.Equal(t, expected, status) assert.Equal(t, expected, statusResp.Status)
// Stop the sandbox. // Stop the sandbox.
expected.State = runtimeapi.PodSandboxState_SANDBOX_NOTREADY expected.State = runtimeapi.PodSandboxState_SANDBOX_NOTREADY
err = ds.StopPodSandbox(id) _, err = ds.StopPodSandbox(getTestCTX(), &runtimeapi.StopPodSandboxRequest{PodSandboxId: id})
assert.NoError(t, err) require.NoError(t, err)
// IP not valid after sandbox stop // IP not valid after sandbox stop
expected.Network.Ip = "" expected.Network.Ip = ""
status, err = ds.PodSandboxStatus(id) statusResp, err = ds.PodSandboxStatus(getTestCTX(), &runtimeapi.PodSandboxStatusRequest{PodSandboxId: id})
assert.Equal(t, expected, status) require.NoError(t, err)
assert.Equal(t, expected, statusResp.Status)
// Remove the container. // Remove the container.
err = ds.RemovePodSandbox(id) _, err = ds.RemovePodSandbox(getTestCTX(), &runtimeapi.RemovePodSandboxRequest{PodSandboxId: id})
assert.NoError(t, err) require.NoError(t, err)
status, err = ds.PodSandboxStatus(id) statusResp, err = ds.PodSandboxStatus(getTestCTX(), &runtimeapi.PodSandboxStatusRequest{PodSandboxId: id})
assert.Error(t, err, fmt.Sprintf("status of sandbox: %+v", status)) assert.Error(t, err, fmt.Sprintf("status of sandbox: %+v", statusResp))
} }
// TestSandboxStatusAfterRestart tests that retrieving sandbox status returns // TestSandboxStatusAfterRestart tests that retrieving sandbox status returns
@ -183,9 +187,10 @@ func TestSandboxStatusAfterRestart(t *testing.T) {
// Check status without RunPodSandbox() having set up networking // Check status without RunPodSandbox() having set up networking
expected.Id = createResp.ID // ID is only known after the creation. expected.Id = createResp.ID // ID is only known after the creation.
status, err := ds.PodSandboxStatus(createResp.ID)
assert.NoError(t, err) statusResp, err := ds.PodSandboxStatus(getTestCTX(), &runtimeapi.PodSandboxStatusRequest{PodSandboxId: createResp.ID})
assert.Equal(t, expected, status) require.NoError(t, err)
assert.Equal(t, expected, statusResp.Status)
} }
// TestNetworkPluginInvocation checks that the right SetUpPod and TearDownPod // TestNetworkPluginInvocation checks that the right SetUpPod and TearDownPod
@ -212,10 +217,10 @@ func TestNetworkPluginInvocation(t *testing.T) {
mockPlugin.EXPECT().GetPodNetworkStatus(ns, name, cID) mockPlugin.EXPECT().GetPodNetworkStatus(ns, name, cID)
mockPlugin.EXPECT().TearDownPod(ns, name, cID).After(setup) mockPlugin.EXPECT().TearDownPod(ns, name, cID).After(setup)
_, err := ds.RunPodSandbox(c) _, err := ds.RunPodSandbox(getTestCTX(), &runtimeapi.RunPodSandboxRequest{Config: c})
assert.NoError(t, err) require.NoError(t, err)
err = ds.StopPodSandbox(cID.ID) _, err = ds.StopPodSandbox(getTestCTX(), &runtimeapi.StopPodSandboxRequest{PodSandboxId: cID.ID})
assert.NoError(t, err) require.NoError(t, err)
} }
// TestHostNetworkPluginInvocation checks that *no* SetUp/TearDown calls happen // TestHostNetworkPluginInvocation checks that *no* SetUp/TearDown calls happen
@ -244,9 +249,11 @@ func TestHostNetworkPluginInvocation(t *testing.T) {
cID := kubecontainer.ContainerID{Type: runtimeName, ID: libdocker.GetFakeContainerID(fmt.Sprintf("/%v", makeSandboxName(c)))} cID := kubecontainer.ContainerID{Type: runtimeName, ID: libdocker.GetFakeContainerID(fmt.Sprintf("/%v", makeSandboxName(c)))}
// No calls to network plugin are expected // No calls to network plugin are expected
_, err := ds.RunPodSandbox(c) _, err := ds.RunPodSandbox(getTestCTX(), &runtimeapi.RunPodSandboxRequest{Config: c})
assert.NoError(t, err) require.NoError(t, err)
assert.NoError(t, ds.StopPodSandbox(cID.ID))
_, err = ds.StopPodSandbox(getTestCTX(), &runtimeapi.StopPodSandboxRequest{PodSandboxId: cID.ID})
require.NoError(t, err)
} }
// TestSetUpPodFailure checks that the sandbox should be not ready when it // TestSetUpPodFailure checks that the sandbox should be not ready when it
@ -271,19 +278,19 @@ func TestSetUpPodFailure(t *testing.T) {
mockPlugin.EXPECT().GetPodNetworkStatus(ns, name, cID).Return(&network.PodNetworkStatus{IP: net.IP("127.0.0.01")}, nil).AnyTimes() mockPlugin.EXPECT().GetPodNetworkStatus(ns, name, cID).Return(&network.PodNetworkStatus{IP: net.IP("127.0.0.01")}, nil).AnyTimes()
t.Logf("RunPodSandbox should return error") t.Logf("RunPodSandbox should return error")
_, err := ds.RunPodSandbox(c) _, err := ds.RunPodSandbox(getTestCTX(), &runtimeapi.RunPodSandboxRequest{Config: c})
assert.Error(t, err) assert.Error(t, err)
t.Logf("PodSandboxStatus should be not ready") t.Logf("PodSandboxStatus should be not ready")
status, err := ds.PodSandboxStatus(cID.ID) statusResp, err := ds.PodSandboxStatus(getTestCTX(), &runtimeapi.PodSandboxStatusRequest{PodSandboxId: cID.ID})
assert.NoError(t, err) require.NoError(t, err)
assert.Equal(t, runtimeapi.PodSandboxState_SANDBOX_NOTREADY, status.State) assert.Equal(t, runtimeapi.PodSandboxState_SANDBOX_NOTREADY, statusResp.Status.State)
t.Logf("ListPodSandbox should also show not ready") t.Logf("ListPodSandbox should also show not ready")
sandboxes, err := ds.ListPodSandbox(nil) listResp, err := ds.ListPodSandbox(getTestCTX(), &runtimeapi.ListPodSandboxRequest{})
assert.NoError(t, err) require.NoError(t, err)
var sandbox *runtimeapi.PodSandbox var sandbox *runtimeapi.PodSandbox
for _, s := range sandboxes { for _, s := range listResp.Items {
if s.Id == cID.ID { if s.Id == cID.ID {
sandbox = s sandbox = s
break break

View File

@ -18,21 +18,16 @@ package dockershim
import ( import (
"fmt" "fmt"
"io"
"net/http" "net/http"
"strconv"
"sync" "sync"
"time" "time"
"github.com/armon/circbuf"
"github.com/blang/semver" "github.com/blang/semver"
dockertypes "github.com/docker/docker/api/types" dockertypes "github.com/docker/docker/api/types"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubetypes "k8s.io/apimachinery/pkg/types"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubecm "k8s.io/kubernetes/pkg/kubelet/cm" kubecm "k8s.io/kubernetes/pkg/kubelet/cm"
@ -86,6 +81,25 @@ const (
// to kubelet behavior and system settings in addition to any API flags that may be introduced. // to kubelet behavior and system settings in addition to any API flags that may be introduced.
) )
// CRIService includes all methods necessary for a CRI server.
type CRIService interface {
runtimeapi.RuntimeServiceServer
runtimeapi.ImageServiceServer
Start() error
}
// DockerService is an interface that embeds the new RuntimeService and
// ImageService interfaces.
type DockerService interface {
CRIService
// For serving streaming calls.
http.Handler
// For supporting legacy features.
DockerLegacyService
}
// NetworkPluginSettings is the subset of kubelet runtime args we pass // NetworkPluginSettings is the subset of kubelet runtime args we pass
// to the container runtime shim so it can probe for network plugins. // to the container runtime shim so it can probe for network plugins.
// In the future we will feed these directly to a standalone container // In the future we will feed these directly to a standalone container
@ -262,25 +276,6 @@ func NewDockerService(config *ClientConfig, podSandboxImage string, streamingCon
return ds, nil return ds, nil
} }
// DockerService is an interface that embeds the new RuntimeService and
// ImageService interfaces.
type DockerService interface {
internalapi.RuntimeService
internalapi.ImageManagerService
Start() error
// For serving streaming calls.
http.Handler
// IsCRISupportedLogDriver checks whether the logging driver used by docker is
// suppoted by native CRI integration.
// TODO(resouer): remove this when deprecating unsupported log driver
IsCRISupportedLogDriver() (bool, error)
// NewDockerLegacyService created docker legacy service when log driver is not supported.
// TODO(resouer): remove this when deprecating unsupported log driver
NewDockerLegacyService() DockerLegacyService
}
type dockerService struct { type dockerService struct {
client libdocker.Interface client libdocker.Interface
os kubecontainer.OSInterface os kubecontainer.OSInterface
@ -309,8 +304,10 @@ type dockerService struct {
disableSharedPID bool disableSharedPID bool
} }
// TODO: handle context.
// Version returns the runtime name, runtime version and runtime API version // Version returns the runtime name, runtime version and runtime API version
func (ds *dockerService) Version(_ string) (*runtimeapi.VersionResponse, error) { func (ds *dockerService) Version(_ context.Context, r *runtimeapi.VersionRequest) (*runtimeapi.VersionResponse, error) {
v, err := ds.getDockerVersion() v, err := ds.getDockerVersion()
if err != nil { if err != nil {
return nil, err return nil, err
@ -336,17 +333,20 @@ func (ds *dockerService) getDockerVersion() (*dockertypes.Version, error) {
} }
// UpdateRuntimeConfig updates the runtime config. Currently only handles podCIDR updates. // UpdateRuntimeConfig updates the runtime config. Currently only handles podCIDR updates.
func (ds *dockerService) UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) (err error) { func (ds *dockerService) UpdateRuntimeConfig(_ context.Context, r *runtimeapi.UpdateRuntimeConfigRequest) (*runtimeapi.UpdateRuntimeConfigResponse, error) {
runtimeConfig := r.GetRuntimeConfig()
if runtimeConfig == nil { if runtimeConfig == nil {
return return &runtimeapi.UpdateRuntimeConfigResponse{}, nil
} }
glog.Infof("docker cri received runtime config %+v", runtimeConfig) glog.Infof("docker cri received runtime config %+v", runtimeConfig)
if ds.network != nil && runtimeConfig.NetworkConfig.PodCidr != "" { if ds.network != nil && runtimeConfig.NetworkConfig.PodCidr != "" {
event := make(map[string]interface{}) event := make(map[string]interface{})
event[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = runtimeConfig.NetworkConfig.PodCidr event[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = runtimeConfig.NetworkConfig.PodCidr
ds.network.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, event) ds.network.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, event)
} }
return
return &runtimeapi.UpdateRuntimeConfigResponse{}, nil
} }
// GetNetNS returns the network namespace of the given containerID. The ID // GetNetNS returns the network namespace of the given containerID. The ID
@ -392,7 +392,7 @@ func (ds *dockerService) Start() error {
// Status returns the status of the runtime. // Status returns the status of the runtime.
// TODO(random-liu): Set network condition accordingly here. // TODO(random-liu): Set network condition accordingly here.
func (ds *dockerService) Status() (*runtimeapi.RuntimeStatus, error) { func (ds *dockerService) Status(_ context.Context, r *runtimeapi.StatusRequest) (*runtimeapi.StatusResponse, error) {
runtimeReady := &runtimeapi.RuntimeCondition{ runtimeReady := &runtimeapi.RuntimeCondition{
Type: runtimeapi.RuntimeReady, Type: runtimeapi.RuntimeReady,
Status: true, Status: true,
@ -412,7 +412,8 @@ func (ds *dockerService) Status() (*runtimeapi.RuntimeStatus, error) {
networkReady.Reason = "NetworkPluginNotReady" networkReady.Reason = "NetworkPluginNotReady"
networkReady.Message = fmt.Sprintf("docker: network plugin is not ready: %v", err) networkReady.Message = fmt.Sprintf("docker: network plugin is not ready: %v", err)
} }
return &runtimeapi.RuntimeStatus{Conditions: conditions}, nil status := &runtimeapi.RuntimeStatus{Conditions: conditions}
return &runtimeapi.StatusResponse{Status: status}, nil
} }
func (ds *dockerService) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (ds *dockerService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -507,103 +508,3 @@ func toAPIProtocol(protocol Protocol) v1.Protocol {
glog.Warningf("Unknown protocol %q: defaulting to TCP", protocol) glog.Warningf("Unknown protocol %q: defaulting to TCP", protocol)
return v1.ProtocolTCP return v1.ProtocolTCP
} }
// DockerLegacyService interface embeds some legacy methods for backward compatibility.
type DockerLegacyService interface {
// GetContainerLogs gets logs for a specific container.
GetContainerLogs(*v1.Pod, kubecontainer.ContainerID, *v1.PodLogOptions, io.Writer, io.Writer) error
}
// dockerLegacyService implements the DockerLegacyService. We add this for non json-log driver
// support. (See #41996)
type dockerLegacyService struct {
client libdocker.Interface
}
// NewDockerLegacyService created docker legacy service when log driver is not supported.
// TODO(resouer): remove this when deprecating unsupported log driver
func (d *dockerService) NewDockerLegacyService() DockerLegacyService {
return &dockerLegacyService{client: d.client}
}
// GetContainerLogs get container logs directly from docker daemon.
func (d *dockerLegacyService) GetContainerLogs(pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
container, err := d.client.InspectContainer(containerID.ID)
if err != nil {
return err
}
var since int64
if logOptions.SinceSeconds != nil {
t := metav1.Now().Add(-time.Duration(*logOptions.SinceSeconds) * time.Second)
since = t.Unix()
}
if logOptions.SinceTime != nil {
since = logOptions.SinceTime.Unix()
}
opts := dockertypes.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Since: strconv.FormatInt(since, 10),
Timestamps: logOptions.Timestamps,
Follow: logOptions.Follow,
}
if logOptions.TailLines != nil {
opts.Tail = strconv.FormatInt(*logOptions.TailLines, 10)
}
sopts := libdocker.StreamOptions{
OutputStream: stdout,
ErrorStream: stderr,
RawTerminal: container.Config.Tty,
}
return d.client.Logs(containerID.ID, opts, sopts)
}
// LegacyLogProvider implements the kuberuntime.LegacyLogProvider interface
type LegacyLogProvider struct {
dls DockerLegacyService
}
func NewLegacyLogProvider(dls DockerLegacyService) LegacyLogProvider {
return LegacyLogProvider{dls: dls}
}
// GetContainerLogTail attempts to read up to MaxContainerTerminationMessageLogLength
// from the end of the log when docker is configured with a log driver other than json-log.
// It reads up to MaxContainerTerminationMessageLogLines lines.
func (l LegacyLogProvider) GetContainerLogTail(uid kubetypes.UID, name, namespace string, containerId kubecontainer.ContainerID) (string, error) {
value := int64(kubecontainer.MaxContainerTerminationMessageLogLines)
buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength)
// Although this is not a full spec pod, dockerLegacyService.GetContainerLogs() currently completely ignores its pod param
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: uid,
Name: name,
Namespace: namespace,
},
}
err := l.dls.GetContainerLogs(pod, containerId, &v1.PodLogOptions{TailLines: &value}, buf, buf)
if err != nil {
return "", err
}
return buf.String(), nil
}
// criSupportedLogDrivers are log drivers supported by native CRI integration.
var criSupportedLogDrivers = []string{"json-file"}
// IsCRISupportedLogDriver checks whether the logging driver used by docker is
// suppoted by native CRI integration.
func (d *dockerService) IsCRISupportedLogDriver() (bool, error) {
info, err := d.client.Info()
if err != nil {
return false, fmt.Errorf("failed to get docker info: %v", err)
}
for _, driver := range criSupportedLogDrivers {
if info.LoggingDriver == driver {
return true, nil
}
}
return false, nil
}

View File

@ -83,33 +83,33 @@ func TestStatus(t *testing.T) {
} }
// Should report ready status if version returns no error. // Should report ready status if version returns no error.
status, err := ds.Status() statusResp, err := ds.Status(getTestCTX(), &runtimeapi.StatusRequest{})
assert.NoError(t, err) require.NoError(t, err)
assertStatus(map[string]bool{ assertStatus(map[string]bool{
runtimeapi.RuntimeReady: true, runtimeapi.RuntimeReady: true,
runtimeapi.NetworkReady: true, runtimeapi.NetworkReady: true,
}, status) }, statusResp.Status)
// Should not report ready status if version returns error. // Should not report ready status if version returns error.
fDocker.InjectError("version", errors.New("test error")) fDocker.InjectError("version", errors.New("test error"))
status, err = ds.Status() statusResp, err = ds.Status(getTestCTX(), &runtimeapi.StatusRequest{})
assert.NoError(t, err) assert.NoError(t, err)
assertStatus(map[string]bool{ assertStatus(map[string]bool{
runtimeapi.RuntimeReady: false, runtimeapi.RuntimeReady: false,
runtimeapi.NetworkReady: true, runtimeapi.NetworkReady: true,
}, status) }, statusResp.Status)
// Should not report ready status is network plugin returns error. // Should not report ready status is network plugin returns error.
mockPlugin := newTestNetworkPlugin(t) mockPlugin := newTestNetworkPlugin(t)
ds.network = network.NewPluginManager(mockPlugin) ds.network = network.NewPluginManager(mockPlugin)
defer mockPlugin.Finish() defer mockPlugin.Finish()
mockPlugin.EXPECT().Status().Return(errors.New("network error")) mockPlugin.EXPECT().Status().Return(errors.New("network error"))
status, err = ds.Status() statusResp, err = ds.Status(getTestCTX(), &runtimeapi.StatusRequest{})
assert.NoError(t, err) assert.NoError(t, err)
assertStatus(map[string]bool{ assertStatus(map[string]bool{
runtimeapi.RuntimeReady: true, runtimeapi.RuntimeReady: true,
runtimeapi.NetworkReady: false, runtimeapi.NetworkReady: false,
}, status) }, statusResp.Status)
} }
func TestVersion(t *testing.T) { func TestVersion(t *testing.T) {

View File

@ -20,15 +20,17 @@ package dockershim
import ( import (
"fmt" "fmt"
"golang.org/x/net/context"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
) )
// ContainerStats returns stats for a container stats request based on container id. // ContainerStats returns stats for a container stats request based on container id.
func (ds *dockerService) ContainerStats(string) (*runtimeapi.ContainerStats, error) { func (ds *dockerService) ContainerStats(_ context.Context, r *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) {
return nil, fmt.Errorf("not implemented") return nil, fmt.Errorf("not implemented")
} }
// ListContainerStats returns stats for a list container stats request based on a filter. // ListContainerStats returns stats for a list container stats request based on a filter.
func (ds *dockerService) ListContainerStats(*runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) { func (ds *dockerService) ListContainerStats(_ context.Context, r *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) {
return nil, fmt.Errorf("not implemented") return nil, fmt.Errorf("not implemented")
} }

View File

@ -21,15 +21,17 @@ package dockershim
import ( import (
"fmt" "fmt"
"golang.org/x/net/context"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
) )
// ContainerStats returns stats for a container stats request based on container id. // ContainerStats returns stats for a container stats request based on container id.
func (ds *dockerService) ContainerStats(string) (*runtimeapi.ContainerStats, error) { func (ds *dockerService) ContainerStats(_ context.Context, r *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) {
return nil, fmt.Errorf("not implemented") return nil, fmt.Errorf("not implemented")
} }
// ListContainerStats returns stats for a list container stats request based on a filter. // ListContainerStats returns stats for a list container stats request based on a filter.
func (ds *dockerService) ListContainerStats(*runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) { func (ds *dockerService) ListContainerStats(_ context.Context, r *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) {
return nil, fmt.Errorf("not implemented") return nil, fmt.Errorf("not implemented")
} }

View File

@ -21,20 +21,23 @@ package dockershim
import ( import (
"time" "time"
"golang.org/x/net/context"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
) )
// ContainerStats returns stats for a container stats request based on container id. // ContainerStats returns stats for a container stats request based on container id.
func (ds *dockerService) ContainerStats(containerID string) (*runtimeapi.ContainerStats, error) { func (ds *dockerService) ContainerStats(_ context.Context, r *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) {
containerStats, err := ds.getContainerStats(containerID) stats, err := ds.getContainerStats(r.ContainerId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return containerStats, nil return &runtimeapi.ContainerStatsResponse{Stats: stats}, nil
} }
// ListContainerStats returns stats for a list container stats request based on a filter. // ListContainerStats returns stats for a list container stats request based on a filter.
func (ds *dockerService) ListContainerStats(containerStatsFilter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) { func (ds *dockerService) ListContainerStats(ctx context.Context, r *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) {
containerStatsFilter := r.GetFilter()
filter := &runtimeapi.ContainerFilter{} filter := &runtimeapi.ContainerFilter{}
if containerStatsFilter != nil { if containerStatsFilter != nil {
@ -43,13 +46,13 @@ func (ds *dockerService) ListContainerStats(containerStatsFilter *runtimeapi.Con
filter.LabelSelector = containerStatsFilter.LabelSelector filter.LabelSelector = containerStatsFilter.LabelSelector
} }
containers, err := ds.ListContainers(filter) listResp, err := ds.ListContainers(ctx, &runtimeapi.ListContainersRequest{Filter: filter})
if err != nil { if err != nil {
return nil, err return nil, err
} }
var stats []*runtimeapi.ContainerStats var stats []*runtimeapi.ContainerStats
for _, container := range containers { for _, container := range listResp.Containers {
containerStats, err := ds.getContainerStats(container.Id) containerStats, err := ds.getContainerStats(container.Id)
if err != nil { if err != nil {
return nil, err return nil, err
@ -58,7 +61,7 @@ func (ds *dockerService) ListContainerStats(containerStatsFilter *runtimeapi.Con
stats = append(stats, containerStats) stats = append(stats, containerStats)
} }
return stats, nil return &runtimeapi.ListContainerStatsResponse{Stats: stats}, nil
} }
func (ds *dockerService) getContainerStats(containerID string) (*runtimeapi.ContainerStats, error) { func (ds *dockerService) getContainerStats(containerID string) (*runtimeapi.ContainerStats, error) {
@ -72,10 +75,11 @@ func (ds *dockerService) getContainerStats(containerID string) (*runtimeapi.Cont
return nil, err return nil, err
} }
status, err := ds.ContainerStatus(containerID) statusResp, err := ds.ContainerStatus(context.Background(), &runtimeapi.ContainerStatusRequest{ContainerId: containerID})
if err != nil { if err != nil {
return nil, err return nil, err
} }
status := statusResp.GetStatus()
dockerStats := statsJSON.Stats dockerStats := statsJSON.Stats
timestamp := time.Now().UnixNano() timestamp := time.Now().UnixNano()

View File

@ -26,14 +26,15 @@ import (
"time" "time"
dockertypes "github.com/docker/docker/api/types" dockertypes "github.com/docker/docker/api/types"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/tools/remotecommand"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/server/streaming" "k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils" "k8s.io/kubernetes/pkg/kubelet/util/ioutils"
utilexec "k8s.io/utils/exec"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
) )
@ -76,20 +77,35 @@ func (r *streamingRuntime) PortForward(podSandboxID string, port int32, stream i
// ExecSync executes a command in the container, and returns the stdout output. // ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned. // If command exits with a non-zero exit code, an error is returned.
func (ds *dockerService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) { func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncRequest) (*runtimeapi.ExecSyncResponse, error) {
timeout := time.Duration(req.Timeout) * time.Second
var stdoutBuffer, stderrBuffer bytes.Buffer var stdoutBuffer, stderrBuffer bytes.Buffer
err = ds.streamingRuntime.exec(containerID, cmd, err := ds.streamingRuntime.exec(req.ContainerId, req.Cmd,
nil, // in nil, // in
ioutils.WriteCloserWrapper(&stdoutBuffer), ioutils.WriteCloserWrapper(&stdoutBuffer),
ioutils.WriteCloserWrapper(&stderrBuffer), ioutils.WriteCloserWrapper(&stderrBuffer),
false, // tty false, // tty
nil, // resize nil, // resize
timeout) timeout)
return stdoutBuffer.Bytes(), stderrBuffer.Bytes(), err
var exitCode int32
if err != nil {
exitError, ok := err.(utilexec.ExitError)
if !ok {
return nil, err
}
exitCode = int32(exitError.ExitStatus())
}
return &runtimeapi.ExecSyncResponse{
Stdout: stdoutBuffer.Bytes(),
Stderr: stderrBuffer.Bytes(),
ExitCode: exitCode,
}, nil
} }
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address. // Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (ds *dockerService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
if ds.streamingServer == nil { if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("exec") return nil, streaming.ErrorStreamingDisabled("exec")
} }
@ -101,7 +117,7 @@ func (ds *dockerService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResp
} }
// Attach prepares a streaming endpoint to attach to a running container, and returns the address. // Attach prepares a streaming endpoint to attach to a running container, and returns the address.
func (ds *dockerService) Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) { func (ds *dockerService) Attach(_ context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
if ds.streamingServer == nil { if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("attach") return nil, streaming.ErrorStreamingDisabled("attach")
} }
@ -113,7 +129,7 @@ func (ds *dockerService) Attach(req *runtimeapi.AttachRequest) (*runtimeapi.Atta
} }
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
func (ds *dockerService) PortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) { func (ds *dockerService) PortForward(_ context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
if ds.streamingServer == nil { if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("port forward") return nil, streaming.ErrorStreamingDisabled("port forward")
} }

View File

@ -7,21 +7,15 @@ load(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = ["docker_server.go"],
"docker_server.go",
"docker_service.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/dockershim/remote", importpath = "k8s.io/kubernetes/pkg/kubelet/dockershim/remote",
deps = [ deps = [
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/dockershim:go_default_library", "//pkg/kubelet/dockershim:go_default_library",
"//pkg/kubelet/util:go_default_library", "//pkg/kubelet/util:go_default_library",
"//pkg/util/interrupt:go_default_library", "//pkg/util/interrupt:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library", "//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
], ],
) )

View File

@ -33,21 +33,27 @@ type DockerServer struct {
// endpoint is the endpoint to serve on. // endpoint is the endpoint to serve on.
endpoint string endpoint string
// service is the docker service which implements runtime and image services. // service is the docker service which implements runtime and image services.
service DockerService service dockershim.CRIService
// server is the grpc server. // server is the grpc server.
server *grpc.Server server *grpc.Server
} }
// NewDockerServer creates the dockershim grpc server. // NewDockerServer creates the dockershim grpc server.
func NewDockerServer(endpoint string, s dockershim.DockerService) *DockerServer { func NewDockerServer(endpoint string, s dockershim.CRIService) *DockerServer {
return &DockerServer{ return &DockerServer{
endpoint: endpoint, endpoint: endpoint,
service: NewDockerService(s), service: s,
} }
} }
// Start starts the dockershim grpc server. // Start starts the dockershim grpc server.
func (s *DockerServer) Start() error { func (s *DockerServer) Start() error {
// Start the internal service.
if err := s.service.Start(); err != nil {
glog.Errorf("Unable to start docker service")
return err
}
glog.V(2).Infof("Start dockershim grpc server") glog.V(2).Infof("Start dockershim grpc server")
l, err := util.CreateListener(s.endpoint) l, err := util.CreateListener(s.endpoint)
if err != nil { if err != nil {

View File

@ -1,249 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remote
import (
"time"
"golang.org/x/net/context"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/dockershim"
utilexec "k8s.io/utils/exec"
)
// DockerService is the interface implement CRI remote service server.
type DockerService interface {
runtimeapi.RuntimeServiceServer
runtimeapi.ImageServiceServer
}
// dockerService uses dockershim service to implement DockerService.
// Notice that the contexts in the functions are not used now.
// TODO(random-liu): Change the dockershim service to support context, and implement
// internal services and remote services with the dockershim service.
type dockerService struct {
runtimeService internalapi.RuntimeService
imageService internalapi.ImageManagerService
}
func NewDockerService(s dockershim.DockerService) DockerService {
return &dockerService{runtimeService: s, imageService: s}
}
func (d *dockerService) Version(ctx context.Context, r *runtimeapi.VersionRequest) (*runtimeapi.VersionResponse, error) {
return d.runtimeService.Version(r.Version)
}
func (d *dockerService) Status(ctx context.Context, r *runtimeapi.StatusRequest) (*runtimeapi.StatusResponse, error) {
status, err := d.runtimeService.Status()
if err != nil {
return nil, err
}
return &runtimeapi.StatusResponse{Status: status}, nil
}
func (d *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
podSandboxId, err := d.runtimeService.RunPodSandbox(r.GetConfig())
if err != nil {
return nil, err
}
return &runtimeapi.RunPodSandboxResponse{PodSandboxId: podSandboxId}, nil
}
func (d *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) {
err := d.runtimeService.StopPodSandbox(r.PodSandboxId)
if err != nil {
return nil, err
}
return &runtimeapi.StopPodSandboxResponse{}, nil
}
func (d *dockerService) RemovePodSandbox(ctx context.Context, r *runtimeapi.RemovePodSandboxRequest) (*runtimeapi.RemovePodSandboxResponse, error) {
err := d.runtimeService.RemovePodSandbox(r.PodSandboxId)
if err != nil {
return nil, err
}
return &runtimeapi.RemovePodSandboxResponse{}, nil
}
func (d *dockerService) PodSandboxStatus(ctx context.Context, r *runtimeapi.PodSandboxStatusRequest) (*runtimeapi.PodSandboxStatusResponse, error) {
podSandboxStatus, err := d.runtimeService.PodSandboxStatus(r.PodSandboxId)
if err != nil {
return nil, err
}
return &runtimeapi.PodSandboxStatusResponse{Status: podSandboxStatus}, nil
}
func (d *dockerService) ListPodSandbox(ctx context.Context, r *runtimeapi.ListPodSandboxRequest) (*runtimeapi.ListPodSandboxResponse, error) {
items, err := d.runtimeService.ListPodSandbox(r.GetFilter())
if err != nil {
return nil, err
}
return &runtimeapi.ListPodSandboxResponse{Items: items}, nil
}
func (d *dockerService) CreateContainer(ctx context.Context, r *runtimeapi.CreateContainerRequest) (*runtimeapi.CreateContainerResponse, error) {
containerId, err := d.runtimeService.CreateContainer(r.PodSandboxId, r.GetConfig(), r.GetSandboxConfig())
if err != nil {
return nil, err
}
return &runtimeapi.CreateContainerResponse{ContainerId: containerId}, nil
}
func (d *dockerService) StartContainer(ctx context.Context, r *runtimeapi.StartContainerRequest) (*runtimeapi.StartContainerResponse, error) {
err := d.runtimeService.StartContainer(r.ContainerId)
if err != nil {
return nil, err
}
return &runtimeapi.StartContainerResponse{}, nil
}
func (d *dockerService) StopContainer(ctx context.Context, r *runtimeapi.StopContainerRequest) (*runtimeapi.StopContainerResponse, error) {
err := d.runtimeService.StopContainer(r.ContainerId, r.Timeout)
if err != nil {
return nil, err
}
return &runtimeapi.StopContainerResponse{}, nil
}
func (d *dockerService) RemoveContainer(ctx context.Context, r *runtimeapi.RemoveContainerRequest) (*runtimeapi.RemoveContainerResponse, error) {
err := d.runtimeService.RemoveContainer(r.ContainerId)
if err != nil {
return nil, err
}
return &runtimeapi.RemoveContainerResponse{}, nil
}
func (d *dockerService) ListContainers(ctx context.Context, r *runtimeapi.ListContainersRequest) (*runtimeapi.ListContainersResponse, error) {
containers, err := d.runtimeService.ListContainers(r.GetFilter())
if err != nil {
return nil, err
}
return &runtimeapi.ListContainersResponse{Containers: containers}, nil
}
func (d *dockerService) ContainerStatus(ctx context.Context, r *runtimeapi.ContainerStatusRequest) (*runtimeapi.ContainerStatusResponse, error) {
status, err := d.runtimeService.ContainerStatus(r.ContainerId)
if err != nil {
return nil, err
}
return &runtimeapi.ContainerStatusResponse{Status: status}, nil
}
func (d *dockerService) UpdateContainerResources(ctx context.Context, r *runtimeapi.UpdateContainerResourcesRequest) (*runtimeapi.UpdateContainerResourcesResponse, error) {
err := d.runtimeService.UpdateContainerResources(r.ContainerId, r.Linux)
if err != nil {
return nil, err
}
return &runtimeapi.UpdateContainerResourcesResponse{}, nil
}
func (d *dockerService) ExecSync(ctx context.Context, r *runtimeapi.ExecSyncRequest) (*runtimeapi.ExecSyncResponse, error) {
stdout, stderr, err := d.runtimeService.ExecSync(r.ContainerId, r.Cmd, time.Duration(r.Timeout)*time.Second)
var exitCode int32
if err != nil {
exitError, ok := err.(utilexec.ExitError)
if !ok {
return nil, err
}
exitCode = int32(exitError.ExitStatus())
}
return &runtimeapi.ExecSyncResponse{
Stdout: stdout,
Stderr: stderr,
ExitCode: exitCode,
}, nil
}
func (d *dockerService) Exec(ctx context.Context, r *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
return d.runtimeService.Exec(r)
}
func (d *dockerService) Attach(ctx context.Context, r *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
return d.runtimeService.Attach(r)
}
func (d *dockerService) PortForward(ctx context.Context, r *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
return d.runtimeService.PortForward(r)
}
func (d *dockerService) UpdateRuntimeConfig(ctx context.Context, r *runtimeapi.UpdateRuntimeConfigRequest) (*runtimeapi.UpdateRuntimeConfigResponse, error) {
err := d.runtimeService.UpdateRuntimeConfig(r.GetRuntimeConfig())
if err != nil {
return nil, err
}
return &runtimeapi.UpdateRuntimeConfigResponse{}, nil
}
func (d *dockerService) ListImages(ctx context.Context, r *runtimeapi.ListImagesRequest) (*runtimeapi.ListImagesResponse, error) {
images, err := d.imageService.ListImages(r.GetFilter())
if err != nil {
return nil, err
}
return &runtimeapi.ListImagesResponse{Images: images}, nil
}
func (d *dockerService) ImageStatus(ctx context.Context, r *runtimeapi.ImageStatusRequest) (*runtimeapi.ImageStatusResponse, error) {
image, err := d.imageService.ImageStatus(r.GetImage())
if err != nil {
return nil, err
}
return &runtimeapi.ImageStatusResponse{Image: image}, nil
}
func (d *dockerService) PullImage(ctx context.Context, r *runtimeapi.PullImageRequest) (*runtimeapi.PullImageResponse, error) {
image, err := d.imageService.PullImage(r.GetImage(), r.GetAuth())
if err != nil {
return nil, err
}
return &runtimeapi.PullImageResponse{ImageRef: image}, nil
}
func (d *dockerService) RemoveImage(ctx context.Context, r *runtimeapi.RemoveImageRequest) (*runtimeapi.RemoveImageResponse, error) {
err := d.imageService.RemoveImage(r.GetImage())
if err != nil {
return nil, err
}
return &runtimeapi.RemoveImageResponse{}, nil
}
// ImageFsInfo returns information of the filesystem that is used to store images.
func (d *dockerService) ImageFsInfo(ctx context.Context, r *runtimeapi.ImageFsInfoRequest) (*runtimeapi.ImageFsInfoResponse, error) {
filesystems, err := d.imageService.ImageFsInfo()
if err != nil {
return nil, err
}
return &runtimeapi.ImageFsInfoResponse{ImageFilesystems: filesystems}, nil
}
func (d *dockerService) ContainerStats(ctx context.Context, r *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) {
stats, err := d.runtimeService.ContainerStats(r.ContainerId)
if err != nil {
return nil, err
}
return &runtimeapi.ContainerStatsResponse{Stats: stats}, nil
}
func (d *dockerService) ListContainerStats(ctx context.Context, r *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) {
stats, err := d.runtimeService.ListContainerStats(r.GetFilter())
if err != nil {
return nil, err
}
return &runtimeapi.ListContainerStatsResponse{Stats: stats}, nil
}

View File

@ -610,9 +610,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := ds.Start(); err != nil {
return nil, err
}
// For now, the CRI shim redirects the streaming requests to the // For now, the CRI shim redirects the streaming requests to the
// kubelet, which handles the requests using DockerService.. // kubelet, which handles the requests using DockerService..
klet.criHandler = ds klet.criHandler = ds
@ -633,8 +630,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
return nil, err return nil, err
} }
if !supported { if !supported {
klet.dockerLegacyService = ds.NewDockerLegacyService() klet.dockerLegacyService = ds
legacyLogProvider = dockershim.NewLegacyLogProvider(klet.dockerLegacyService) legacyLogProvider = ds
} }
case kubetypes.RemoteContainerRuntime: case kubetypes.RemoteContainerRuntime:
// No-op. // No-op.