diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index da8fe6fb594..c302dfc4074 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -998,10 +998,6 @@ func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConf if err != nil { return err } - if err := ds.Start(); err != nil { - return err - } - glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.") server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds) if err := server.Start(); err != nil { diff --git a/hack/.golint_failures b/hack/.golint_failures index 92f25955ec8..577741f9eee 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -178,7 +178,6 @@ pkg/kubelet/custommetrics pkg/kubelet/dockershim pkg/kubelet/dockershim/cm pkg/kubelet/dockershim/libdocker -pkg/kubelet/dockershim/remote pkg/kubelet/dockershim/testing pkg/kubelet/events pkg/kubelet/gpu diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index a201ddd57e6..8412e41480f 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -14,6 +14,7 @@ go_library( "docker_checkpoint.go", "docker_container.go", "docker_image.go", + "docker_legacy_service.go", "docker_sandbox.go", "docker_service.go", "docker_streaming.go", @@ -83,7 +84,6 @@ go_library( importpath = "k8s.io/kubernetes/pkg/kubelet/dockershim", deps = [ "//pkg/credentialprovider:go_default_library", - "//pkg/kubelet/apis/cri:go_default_library", "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/cm:go_default_library", @@ -91,6 +91,7 @@ go_library( "//pkg/kubelet/dockershim/cm:go_default_library", "//pkg/kubelet/dockershim/libdocker:go_default_library", "//pkg/kubelet/dockershim/metrics:go_default_library", + "//pkg/kubelet/kuberuntime:go_default_library", "//pkg/kubelet/leaky:go_default_library", "//pkg/kubelet/network:go_default_library", "//pkg/kubelet/network/cni:go_default_library", @@ -115,11 +116,13 @@ go_library( "//vendor/github.com/docker/docker/pkg/jsonmessage:go_default_library", "//vendor/github.com/docker/go-connections/nat:go_default_library", "//vendor/github.com/golang/glog:go_default_library", + "//vendor/golang.org/x/net/context:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/client-go/tools/remotecommand:go_default_library", + "//vendor/k8s.io/utils/exec:go_default_library", ], ) @@ -165,6 +168,7 @@ go_test( "//vendor/github.com/golang/mock/gomock:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", + "//vendor/golang.org/x/net/context:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", ], ) diff --git a/pkg/kubelet/dockershim/docker_container.go b/pkg/kubelet/dockershim/docker_container.go index 33a05556dfd..522649845c2 100644 --- a/pkg/kubelet/dockershim/docker_container.go +++ b/pkg/kubelet/dockershim/docker_container.go @@ -27,13 +27,15 @@ import ( dockerfilters "github.com/docker/docker/api/types/filters" dockerstrslice "github.com/docker/docker/api/types/strslice" "github.com/golang/glog" + "golang.org/x/net/context" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" ) // ListContainers lists all containers matching the filter. -func (ds *dockerService) ListContainers(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) { +func (ds *dockerService) ListContainers(_ context.Context, r *runtimeapi.ListContainersRequest) (*runtimeapi.ListContainersResponse, error) { + filter := r.GetFilter() opts := dockertypes.ContainerListOptions{All: true} opts.Filters = dockerfilters.NewArgs() @@ -75,19 +77,24 @@ func (ds *dockerService) ListContainers(filter *runtimeapi.ContainerFilter) ([]* result = append(result, converted) } - return result, nil + + return &runtimeapi.ListContainersResponse{Containers: result}, nil } // CreateContainer creates a new container in the given PodSandbox // Docker cannot store the log to an arbitrary location (yet), so we create an // symlink at LogPath, linking to the actual path of the log. // TODO: check if the default values returned by the runtime API are ok. -func (ds *dockerService) CreateContainer(podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) { +func (ds *dockerService) CreateContainer(_ context.Context, r *runtimeapi.CreateContainerRequest) (*runtimeapi.CreateContainerResponse, error) { + podSandboxID := r.PodSandboxId + config := r.GetConfig() + sandboxConfig := r.GetSandboxConfig() + if config == nil { - return "", fmt.Errorf("container config is nil") + return nil, fmt.Errorf("container config is nil") } if sandboxConfig == nil { - return "", fmt.Errorf("sandbox config is nil for container %q", config.Metadata.Name) + return nil, fmt.Errorf("sandbox config is nil for container %q", config.Metadata.Name) } labels := makeLabels(config.GetLabels(), config.GetAnnotations()) @@ -100,7 +107,7 @@ func (ds *dockerService) CreateContainer(podSandboxID string, config *runtimeapi apiVersion, err := ds.getDockerAPIVersion() if err != nil { - return "", fmt.Errorf("unable to get the docker API version: %v", err) + return nil, fmt.Errorf("unable to get the docker API version: %v", err) } image := "" @@ -147,7 +154,7 @@ func (ds *dockerService) CreateContainer(podSandboxID string, config *runtimeapi securityOpts, err := ds.getSecurityOpts(config.GetLinux().GetSecurityContext().GetSeccompProfilePath(), securityOptSeparator) if err != nil { - return "", fmt.Errorf("failed to generate security options for container %q: %v", config.Metadata.Name, err) + return nil, fmt.Errorf("failed to generate security options for container %q: %v", config.Metadata.Name, err) } hc.SecurityOpt = append(hc.SecurityOpt, securityOpts...) @@ -158,9 +165,9 @@ func (ds *dockerService) CreateContainer(podSandboxID string, config *runtimeapi } if createResp != nil { - return createResp.ID, err + return &runtimeapi.CreateContainerResponse{ContainerId: createResp.ID}, nil } - return "", err + return nil, err } // getContainerLogPath returns the container log path specified by kubelet and the real @@ -229,45 +236,49 @@ func (ds *dockerService) removeContainerLogSymlink(containerID string) error { } // StartContainer starts the container. -func (ds *dockerService) StartContainer(containerID string) error { - err := ds.client.StartContainer(containerID) +func (ds *dockerService) StartContainer(_ context.Context, r *runtimeapi.StartContainerRequest) (*runtimeapi.StartContainerResponse, error) { + err := ds.client.StartContainer(r.ContainerId) // Create container log symlink for all containers (including failed ones). - if linkError := ds.createContainerLogSymlink(containerID); linkError != nil { + if linkError := ds.createContainerLogSymlink(r.ContainerId); linkError != nil { // Do not stop the container if we failed to create symlink because: // 1. This is not a critical failure. // 2. We don't have enough information to properly stop container here. // Kubelet will surface this error to user via an event. - return linkError + return nil, linkError } if err != nil { err = transformStartContainerError(err) - return fmt.Errorf("failed to start container %q: %v", containerID, err) + return nil, fmt.Errorf("failed to start container %q: %v", r.ContainerId, err) } - return nil + return &runtimeapi.StartContainerResponse{}, nil } // StopContainer stops a running container with a grace period (i.e., timeout). -func (ds *dockerService) StopContainer(containerID string, timeout int64) error { - return ds.client.StopContainer(containerID, time.Duration(timeout)*time.Second) +func (ds *dockerService) StopContainer(_ context.Context, r *runtimeapi.StopContainerRequest) (*runtimeapi.StopContainerResponse, error) { + err := ds.client.StopContainer(r.ContainerId, time.Duration(r.Timeout)*time.Second) + if err != nil { + return nil, err + } + return &runtimeapi.StopContainerResponse{}, nil } // RemoveContainer removes the container. -func (ds *dockerService) RemoveContainer(containerID string) error { +func (ds *dockerService) RemoveContainer(_ context.Context, r *runtimeapi.RemoveContainerRequest) (*runtimeapi.RemoveContainerResponse, error) { // Ideally, log lifecycle should be independent of container lifecycle. // However, docker will remove container log after container is removed, // we can't prevent that now, so we also clean up the symlink here. - err := ds.removeContainerLogSymlink(containerID) + err := ds.removeContainerLogSymlink(r.ContainerId) if err != nil { - return err + return nil, err } - err = ds.client.RemoveContainer(containerID, dockertypes.ContainerRemoveOptions{RemoveVolumes: true, Force: true}) + err = ds.client.RemoveContainer(r.ContainerId, dockertypes.ContainerRemoveOptions{RemoveVolumes: true, Force: true}) if err != nil { - return fmt.Errorf("failed to remove container %q: %v", containerID, err) + return nil, fmt.Errorf("failed to remove container %q: %v", r.ContainerId, err) } - return nil + return &runtimeapi.RemoveContainerResponse{}, nil } func getContainerTimestamps(r *dockertypes.ContainerJSON) (time.Time, time.Time, time.Time, error) { @@ -290,7 +301,8 @@ func getContainerTimestamps(r *dockertypes.ContainerJSON) (time.Time, time.Time, } // ContainerStatus inspects the docker container and returns the status. -func (ds *dockerService) ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error) { +func (ds *dockerService) ContainerStatus(_ context.Context, req *runtimeapi.ContainerStatusRequest) (*runtimeapi.ContainerStatusResponse, error) { + containerID := req.ContainerId r, err := ds.client.InspectContainer(containerID) if err != nil { return nil, err @@ -373,7 +385,7 @@ func (ds *dockerService) ContainerStatus(containerID string) (*runtimeapi.Contai if len(ir.RepoTags) > 0 { imageName = ir.RepoTags[0] } - return &runtimeapi.ContainerStatus{ + status := &runtimeapi.ContainerStatus{ Id: r.ID, Metadata: metadata, Image: &runtimeapi.ImageSpec{Image: imageName}, @@ -389,10 +401,12 @@ func (ds *dockerService) ContainerStatus(containerID string) (*runtimeapi.Contai Labels: labels, Annotations: annotations, LogPath: r.Config.Labels[containerLogPathLabelKey], - }, nil + } + return &runtimeapi.ContainerStatusResponse{Status: status}, nil } -func (ds *dockerService) UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error { +func (ds *dockerService) UpdateContainerResources(_ context.Context, r *runtimeapi.UpdateContainerResourcesRequest) (*runtimeapi.UpdateContainerResourcesResponse, error) { + resources := r.Linux updateConfig := dockercontainer.UpdateConfig{ Resources: dockercontainer.Resources{ CPUPeriod: resources.CpuPeriod, @@ -404,9 +418,9 @@ func (ds *dockerService) UpdateContainerResources(containerID string, resources }, } - err := ds.client.UpdateContainerResources(containerID, updateConfig) + err := ds.client.UpdateContainerResources(r.ContainerId, updateConfig) if err != nil { - return fmt.Errorf("failed to update container %q: %v", containerID, err) + return nil, fmt.Errorf("failed to update container %q: %v", r.ContainerId, err) } - return nil + return &runtimeapi.UpdateContainerResourcesResponse{}, nil } diff --git a/pkg/kubelet/dockershim/docker_container_test.go b/pkg/kubelet/dockershim/docker_container_test.go index ed99cc27752..c96e6e970de 100644 --- a/pkg/kubelet/dockershim/docker_container_test.go +++ b/pkg/kubelet/dockershim/docker_container_test.go @@ -26,6 +26,7 @@ import ( dockertypes "github.com/docker/docker/api/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/net/context" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" @@ -44,6 +45,10 @@ func makeContainerConfig(sConfig *runtimeapi.PodSandboxConfig, name, image strin } } +func getTestCTX() context.Context { + return context.Background() +} + // TestListContainers creates several containers and then list them to check // whether the correct metadatas, states, and labels are returned. func TestListContainers(t *testing.T) { @@ -70,10 +75,12 @@ func TestListContainers(t *testing.T) { for i := range configs { // We don't care about the sandbox id; pass a bogus one. sandboxID := fmt.Sprintf("sandboxid%d", i) - id, err := ds.CreateContainer(sandboxID, configs[i], sConfigs[i]) - assert.NoError(t, err) - err = ds.StartContainer(id) - assert.NoError(t, err) + req := &runtimeapi.CreateContainerRequest{PodSandboxId: sandboxID, Config: configs[i], SandboxConfig: sConfigs[i]} + createResp, err := ds.CreateContainer(getTestCTX(), req) + require.NoError(t, err) + id := createResp.ContainerId + _, err = ds.StartContainer(getTestCTX(), &runtimeapi.StartContainerRequest{ContainerId: id}) + require.NoError(t, err) imageRef := "" // FakeDockerClient doesn't populate ImageRef yet. // Prepend to the expected list because ListContainers returns @@ -90,10 +97,10 @@ func TestListContainers(t *testing.T) { Annotations: configs[i].Annotations, }}, expected...) } - containers, err := ds.ListContainers(nil) - assert.NoError(t, err) - assert.Len(t, containers, len(expected)) - assert.Equal(t, expected, containers) + listResp, err := ds.ListContainers(getTestCTX(), &runtimeapi.ListContainersRequest{}) + require.NoError(t, err) + assert.Len(t, listResp.Containers, len(expected)) + assert.Equal(t, expected, listResp.Containers) } // TestContainerStatus tests the basic lifecycle operations and verify that @@ -137,31 +144,36 @@ func TestContainerStatus(t *testing.T) { fClock.SetTime(time.Now().Add(-1 * time.Hour)) expected.CreatedAt = fClock.Now().UnixNano() const sandboxId = "sandboxid" - id, err := ds.CreateContainer(sandboxId, config, sConfig) - assert.NoError(t, err) + + req := &runtimeapi.CreateContainerRequest{PodSandboxId: sandboxId, Config: config, SandboxConfig: sConfig} + createResp, err := ds.CreateContainer(getTestCTX(), req) + require.NoError(t, err) + id := createResp.ContainerId // Check internal labels c, err := fDocker.InspectContainer(id) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, c.Config.Labels[containerTypeLabelKey], containerTypeLabelContainer) assert.Equal(t, c.Config.Labels[sandboxIDLabelKey], sandboxId) // Set the id manually since we don't know the id until it's created. expected.Id = id assert.NoError(t, err) - status, err := ds.ContainerStatus(id) - assert.NoError(t, err) - assert.Equal(t, expected, status) + resp, err := ds.ContainerStatus(getTestCTX(), &runtimeapi.ContainerStatusRequest{ContainerId: id}) + require.NoError(t, err) + assert.Equal(t, expected, resp.Status) // Advance the clock and start the container. fClock.SetTime(time.Now()) expected.StartedAt = fClock.Now().UnixNano() expected.State = runtimeapi.ContainerState_CONTAINER_RUNNING - err = ds.StartContainer(id) - assert.NoError(t, err) - status, err = ds.ContainerStatus(id) - assert.Equal(t, expected, status) + _, err = ds.StartContainer(getTestCTX(), &runtimeapi.StartContainerRequest{ContainerId: id}) + require.NoError(t, err) + + resp, err = ds.ContainerStatus(getTestCTX(), &runtimeapi.ContainerStatusRequest{ContainerId: id}) + require.NoError(t, err) + assert.Equal(t, expected, resp.Status) // Advance the clock and stop the container. fClock.SetTime(time.Now().Add(1 * time.Hour)) @@ -169,16 +181,17 @@ func TestContainerStatus(t *testing.T) { expected.State = runtimeapi.ContainerState_CONTAINER_EXITED expected.Reason = "Completed" - err = ds.StopContainer(id, 0) + _, err = ds.StopContainer(getTestCTX(), &runtimeapi.StopContainerRequest{ContainerId: id, Timeout: int64(0)}) assert.NoError(t, err) - status, err = ds.ContainerStatus(id) - assert.Equal(t, expected, status) + resp, err = ds.ContainerStatus(getTestCTX(), &runtimeapi.ContainerStatusRequest{ContainerId: id}) + require.NoError(t, err) + assert.Equal(t, expected, resp.Status) // Remove the container. - err = ds.RemoveContainer(id) - assert.NoError(t, err) - status, err = ds.ContainerStatus(id) - assert.Error(t, err, fmt.Sprintf("status of container: %+v", status)) + _, err = ds.RemoveContainer(getTestCTX(), &runtimeapi.RemoveContainerRequest{ContainerId: id}) + require.NoError(t, err) + resp, err = ds.ContainerStatus(getTestCTX(), &runtimeapi.ContainerStatusRequest{ContainerId: id}) + assert.Error(t, err, fmt.Sprintf("status of container: %+v", resp)) } // TestContainerLogPath tests the container log creation logic. @@ -193,7 +206,10 @@ func TestContainerLogPath(t *testing.T) { config.LogPath = containerLogPath const sandboxId = "sandboxid" - id, err := ds.CreateContainer(sandboxId, config, sConfig) + req := &runtimeapi.CreateContainerRequest{PodSandboxId: sandboxId, Config: config, SandboxConfig: sConfig} + createResp, err := ds.CreateContainer(getTestCTX(), req) + require.NoError(t, err) + id := createResp.ContainerId // Check internal container log label c, err := fDocker.InspectContainer(id) @@ -211,16 +227,16 @@ func TestContainerLogPath(t *testing.T) { assert.Equal(t, kubeletContainerLogPath, newname) return nil } - err = ds.StartContainer(id) - assert.NoError(t, err) + _, err = ds.StartContainer(getTestCTX(), &runtimeapi.StartContainerRequest{ContainerId: id}) + require.NoError(t, err) - err = ds.StopContainer(id, 0) - assert.NoError(t, err) + _, err = ds.StopContainer(getTestCTX(), &runtimeapi.StopContainerRequest{ContainerId: id, Timeout: int64(0)}) + require.NoError(t, err) // Verify container log symlink deletion // symlink is also tentatively deleted at startup - err = ds.RemoveContainer(id) - assert.NoError(t, err) + _, err = ds.RemoveContainer(getTestCTX(), &runtimeapi.RemoveContainerRequest{ContainerId: id}) + require.NoError(t, err) assert.Equal(t, []string{kubeletContainerLogPath, kubeletContainerLogPath}, fakeOS.Removes) } @@ -280,11 +296,13 @@ func TestContainerCreationConflict(t *testing.T) { if test.removeError != nil { fDocker.InjectError("remove", test.removeError) } - id, err := ds.CreateContainer(sandboxId, config, sConfig) + + req := &runtimeapi.CreateContainerRequest{PodSandboxId: sandboxId, Config: config, SandboxConfig: sConfig} + createResp, err := ds.CreateContainer(getTestCTX(), req) require.Equal(t, test.expectError, err) assert.NoError(t, fDocker.AssertCalls(test.expectCalls)) if err == nil { - c, err := fDocker.InspectContainer(id) + c, err := fDocker.InspectContainer(createResp.ContainerId) assert.NoError(t, err) assert.Len(t, strings.Split(c.Name, nameDelimiter), test.expectFields) } diff --git a/pkg/kubelet/dockershim/docker_image.go b/pkg/kubelet/dockershim/docker_image.go index 1b39522f862..622945a8ec8 100644 --- a/pkg/kubelet/dockershim/docker_image.go +++ b/pkg/kubelet/dockershim/docker_image.go @@ -23,6 +23,7 @@ import ( dockertypes "github.com/docker/docker/api/types" dockerfilters "github.com/docker/docker/api/types/filters" "github.com/docker/docker/pkg/jsonmessage" + "golang.org/x/net/context" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" @@ -31,7 +32,8 @@ import ( // This file implements methods in ImageManagerService. // ListImages lists existing images. -func (ds *dockerService) ListImages(filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error) { +func (ds *dockerService) ListImages(_ context.Context, r *runtimeapi.ListImagesRequest) (*runtimeapi.ListImagesResponse, error) { + filter := r.GetFilter() opts := dockertypes.ImageListOptions{} if filter != nil { if filter.GetImage().GetImage() != "" { @@ -54,24 +56,35 @@ func (ds *dockerService) ListImages(filter *runtimeapi.ImageFilter) ([]*runtimea } result = append(result, apiImage) } - return result, nil + return &runtimeapi.ListImagesResponse{Images: result}, nil } // ImageStatus returns the status of the image, returns nil if the image doesn't present. -func (ds *dockerService) ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi.Image, error) { +func (ds *dockerService) ImageStatus(_ context.Context, r *runtimeapi.ImageStatusRequest) (*runtimeapi.ImageStatusResponse, error) { + image := r.GetImage() + imageInspect, err := ds.client.InspectImageByRef(image.Image) if err != nil { if libdocker.IsImageNotFoundError(err) { - return nil, nil + return &runtimeapi.ImageStatusResponse{}, nil } return nil, err } - return imageInspectToRuntimeAPIImage(imageInspect) + + imageStatus, err := imageInspectToRuntimeAPIImage(imageInspect) + if err != nil { + return nil, err + } + + return &runtimeapi.ImageStatusResponse{Image: imageStatus}, nil } // PullImage pulls an image with authentication config. -func (ds *dockerService) PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig) (string, error) { +func (ds *dockerService) PullImage(_ context.Context, r *runtimeapi.PullImageRequest) (*runtimeapi.PullImageResponse, error) { + image := r.GetImage() + auth := r.GetAuth() authConfig := dockertypes.AuthConfig{} + if auth != nil { authConfig.Username = auth.Username authConfig.Password = auth.Password @@ -84,14 +97,20 @@ func (ds *dockerService) PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi dockertypes.ImagePullOptions{}, ) if err != nil { - return "", filterHTTPError(err, image.Image) + return nil, filterHTTPError(err, image.Image) } - return getImageRef(ds.client, image.Image) + imageRef, err := getImageRef(ds.client, image.Image) + if err != nil { + return nil, err + } + + return &runtimeapi.PullImageResponse{ImageRef: imageRef}, nil } // RemoveImage removes the image. -func (ds *dockerService) RemoveImage(image *runtimeapi.ImageSpec) error { +func (ds *dockerService) RemoveImage(_ context.Context, r *runtimeapi.RemoveImageRequest) (*runtimeapi.RemoveImageResponse, error) { + image := r.GetImage() // If the image has multiple tags, we need to remove all the tags // TODO: We assume image.Image is image ID here, which is true in the current implementation // of kubelet, but we should still clarify this in CRI. @@ -99,22 +118,22 @@ func (ds *dockerService) RemoveImage(image *runtimeapi.ImageSpec) error { if err == nil && imageInspect != nil && len(imageInspect.RepoTags) > 1 { for _, tag := range imageInspect.RepoTags { if _, err := ds.client.RemoveImage(tag, dockertypes.ImageRemoveOptions{PruneChildren: true}); err != nil && !libdocker.IsImageNotFoundError(err) { - return err + return nil, err } } - return nil + return &runtimeapi.RemoveImageResponse{}, nil } // dockerclient.InspectImageByID doesn't work with digest and repoTags, // it is safe to continue removing it since there is another check below. if err != nil && !libdocker.IsImageNotFoundError(err) { - return err + return nil, err } _, err = ds.client.RemoveImage(image.Image, dockertypes.ImageRemoveOptions{PruneChildren: true}) if err != nil && !libdocker.IsImageNotFoundError(err) { - return err + return nil, err } - return nil + return &runtimeapi.RemoveImageResponse{}, nil } // getImageRef returns the image digest if exists, or else returns the image ID. diff --git a/pkg/kubelet/dockershim/docker_image_linux.go b/pkg/kubelet/dockershim/docker_image_linux.go index 1af785972cc..3ecc6599e92 100644 --- a/pkg/kubelet/dockershim/docker_image_linux.go +++ b/pkg/kubelet/dockershim/docker_image_linux.go @@ -21,10 +21,12 @@ package dockershim import ( "fmt" + "golang.org/x/net/context" + runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // ImageFsInfo returns information of the filesystem that is used to store images. -func (ds *dockerService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) { +func (ds *dockerService) ImageFsInfo(_ context.Context, r *runtimeapi.ImageFsInfoRequest) (*runtimeapi.ImageFsInfoResponse, error) { return nil, fmt.Errorf("not implemented") } diff --git a/pkg/kubelet/dockershim/docker_image_test.go b/pkg/kubelet/dockershim/docker_image_test.go index d9e3c853244..91a502629e3 100644 --- a/pkg/kubelet/dockershim/docker_image_test.go +++ b/pkg/kubelet/dockershim/docker_image_test.go @@ -23,6 +23,7 @@ import ( dockertypes "github.com/docker/docker/api/types" "github.com/docker/docker/pkg/jsonmessage" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" @@ -32,7 +33,7 @@ func TestRemoveImage(t *testing.T) { ds, fakeDocker, _ := newTestDockerService() id := "1111" fakeDocker.InjectImageInspects([]dockertypes.ImageInspect{{ID: id, RepoTags: []string{"foo"}}}) - ds.RemoveImage(&runtimeapi.ImageSpec{Image: id}) + ds.RemoveImage(getTestCTX(), &runtimeapi.RemoveImageRequest{Image: &runtimeapi.ImageSpec{Image: id}}) fakeDocker.AssertCallDetails(libdocker.NewCalledDetail("inspect_image", nil), libdocker.NewCalledDetail("remove_image", []interface{}{id, dockertypes.ImageRemoveOptions{PruneChildren: true}})) } @@ -41,7 +42,7 @@ func TestRemoveImageWithMultipleTags(t *testing.T) { ds, fakeDocker, _ := newTestDockerService() id := "1111" fakeDocker.InjectImageInspects([]dockertypes.ImageInspect{{ID: id, RepoTags: []string{"foo", "bar"}}}) - ds.RemoveImage(&runtimeapi.ImageSpec{Image: id}) + ds.RemoveImage(getTestCTX(), &runtimeapi.RemoveImageRequest{Image: &runtimeapi.ImageSpec{Image: id}}) fakeDocker.AssertCallDetails(libdocker.NewCalledDetail("inspect_image", nil), libdocker.NewCalledDetail("remove_image", []interface{}{"foo", dockertypes.ImageRemoveOptions{PruneChildren: true}}), libdocker.NewCalledDetail("remove_image", []interface{}{"bar", dockertypes.ImageRemoveOptions{PruneChildren: true}})) @@ -67,8 +68,8 @@ func TestPullWithJSONError(t *testing.T) { } for key, test := range tests { fakeDocker.InjectError("pull", test.err) - _, err := ds.PullImage(test.image, &runtimeapi.AuthConfig{}) - assert.Error(t, err, fmt.Sprintf("TestCase [%s]", key)) + _, err := ds.PullImage(getTestCTX(), &runtimeapi.PullImageRequest{Image: test.image, Auth: &runtimeapi.AuthConfig{}}) + require.Error(t, err, fmt.Sprintf("TestCase [%s]", key)) assert.Contains(t, err.Error(), test.expectedError) } } diff --git a/pkg/kubelet/dockershim/docker_image_unsupported.go b/pkg/kubelet/dockershim/docker_image_unsupported.go index 17519e0385f..b4fb70b3a4c 100644 --- a/pkg/kubelet/dockershim/docker_image_unsupported.go +++ b/pkg/kubelet/dockershim/docker_image_unsupported.go @@ -21,10 +21,12 @@ package dockershim import ( "fmt" + "golang.org/x/net/context" + runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // ImageFsInfo returns information of the filesystem that is used to store images. -func (ds *dockerService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) { +func (ds *dockerService) ImageFsInfo(_ context.Context, r *runtimeapi.ImageFsInfoRequest) (*runtimeapi.ImageFsInfoResponse, error) { return nil, fmt.Errorf("not implemented") } diff --git a/pkg/kubelet/dockershim/docker_image_windows.go b/pkg/kubelet/dockershim/docker_image_windows.go index b147659f2b6..385b0545878 100644 --- a/pkg/kubelet/dockershim/docker_image_windows.go +++ b/pkg/kubelet/dockershim/docker_image_windows.go @@ -21,11 +21,13 @@ package dockershim import ( "time" + "golang.org/x/net/context" + runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // ImageFsInfo returns information of the filesystem that is used to store images. -func (ds *dockerService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) { +func (ds *dockerService) ImageFsInfo(_ context.Context, _ *runtimeapi.ImageFsInfoRequest) (*runtimeapi.ImageFsInfoResponse, error) { // For Windows Stats to work correctly, a file system must be provided. For now, provide a fake filesystem. filesystems := []*runtimeapi.FilesystemUsage{ { @@ -35,5 +37,5 @@ func (ds *dockerService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) { }, } - return filesystems, nil + return &runtimeapi.ImageFsInfoResponse{ImageFilesystems: filesystems}, nil } diff --git a/pkg/kubelet/dockershim/docker_legacy_service.go b/pkg/kubelet/dockershim/docker_legacy_service.go new file mode 100644 index 00000000000..357e673622f --- /dev/null +++ b/pkg/kubelet/dockershim/docker_legacy_service.go @@ -0,0 +1,123 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dockershim + +import ( + "fmt" + "io" + "strconv" + "time" + + "github.com/armon/circbuf" + dockertypes "github.com/docker/docker/api/types" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubetypes "k8s.io/apimachinery/pkg/types" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/kuberuntime" + + "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" +) + +// DockerLegacyService interface embeds some legacy methods for backward compatibility. +// This file/interface will be removed in the near future. Do not modify or add +// more functions. +type DockerLegacyService interface { + // GetContainerLogs gets logs for a specific container. + GetContainerLogs(*v1.Pod, kubecontainer.ContainerID, *v1.PodLogOptions, io.Writer, io.Writer) error + + // IsCRISupportedLogDriver checks whether the logging driver used by docker is + // suppoted by native CRI integration. + // TODO(resouer): remove this when deprecating unsupported log driver + IsCRISupportedLogDriver() (bool, error) + + kuberuntime.LegacyLogProvider +} + +// GetContainerLogs get container logs directly from docker daemon. +func (d *dockerService) GetContainerLogs(pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { + container, err := d.client.InspectContainer(containerID.ID) + if err != nil { + return err + } + + var since int64 + if logOptions.SinceSeconds != nil { + t := metav1.Now().Add(-time.Duration(*logOptions.SinceSeconds) * time.Second) + since = t.Unix() + } + if logOptions.SinceTime != nil { + since = logOptions.SinceTime.Unix() + } + opts := dockertypes.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Since: strconv.FormatInt(since, 10), + Timestamps: logOptions.Timestamps, + Follow: logOptions.Follow, + } + if logOptions.TailLines != nil { + opts.Tail = strconv.FormatInt(*logOptions.TailLines, 10) + } + + sopts := libdocker.StreamOptions{ + OutputStream: stdout, + ErrorStream: stderr, + RawTerminal: container.Config.Tty, + } + return d.client.Logs(containerID.ID, opts, sopts) +} + +// GetContainerLogTail attempts to read up to MaxContainerTerminationMessageLogLength +// from the end of the log when docker is configured with a log driver other than json-log. +// It reads up to MaxContainerTerminationMessageLogLines lines. +func (d *dockerService) GetContainerLogTail(uid kubetypes.UID, name, namespace string, containerId kubecontainer.ContainerID) (string, error) { + value := int64(kubecontainer.MaxContainerTerminationMessageLogLines) + buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength) + // Although this is not a full spec pod, dockerLegacyService.GetContainerLogs() currently completely ignores its pod param + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: uid, + Name: name, + Namespace: namespace, + }, + } + err := d.GetContainerLogs(pod, containerId, &v1.PodLogOptions{TailLines: &value}, buf, buf) + if err != nil { + return "", err + } + return buf.String(), nil +} + +// criSupportedLogDrivers are log drivers supported by native CRI integration. +var criSupportedLogDrivers = []string{"json-file"} + +// IsCRISupportedLogDriver checks whether the logging driver used by docker is +// suppoted by native CRI integration. +func (d *dockerService) IsCRISupportedLogDriver() (bool, error) { + info, err := d.client.Info() + if err != nil { + return false, fmt.Errorf("failed to get docker info: %v", err) + } + for _, driver := range criSupportedLogDrivers { + if info.LoggingDriver == driver { + return true, nil + } + } + return false, nil +} diff --git a/pkg/kubelet/dockershim/docker_sandbox.go b/pkg/kubelet/dockershim/docker_sandbox.go index 88057311ac9..24d7879eeea 100644 --- a/pkg/kubelet/dockershim/docker_sandbox.go +++ b/pkg/kubelet/dockershim/docker_sandbox.go @@ -26,6 +26,7 @@ import ( dockercontainer "github.com/docker/docker/api/types/container" dockerfilters "github.com/docker/docker/api/types/filters" "github.com/golang/glog" + "golang.org/x/net/context" utilerrors "k8s.io/apimachinery/pkg/util/errors" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -75,7 +76,9 @@ func (ds *dockerService) clearNetworkReady(podSandboxID string) { // For docker, PodSandbox is implemented by a container holding the network // namespace for the pod. // Note: docker doesn't use LogDirectory (yet). -func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id string, err error) { +func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) { + config := r.GetConfig() + // Step 1: Pull the image for the sandbox. image := defaultSandboxImage podSandboxImage := ds.podSandboxImage @@ -87,13 +90,13 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id // see: http://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-repository // Only pull sandbox image when it's not present - v1.PullIfNotPresent. if err := ensureSandboxImageExists(ds.client, image); err != nil { - return "", err + return nil, err } // Step 2: Create the sandbox container. createConfig, err := ds.makeSandboxDockerConfig(config, image) if err != nil { - return "", fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err) + return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err) } createResp, err := ds.client.CreateContainer(*createConfig) if err != nil { @@ -101,8 +104,9 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id } if err != nil || createResp == nil { - return "", fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err) + return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err) } + resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID} ds.setNetworkReady(createResp.ID, false) defer func(e *error) { @@ -115,7 +119,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id // Step 3: Create Sandbox Checkpoint. if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { - return createResp.ID, err + return nil, err } // Step 4: Start the sandbox container. @@ -123,7 +127,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id // startContainer failed. err = ds.client.StartContainer(createResp.ID) if err != nil { - return createResp.ID, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err) + return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err) } // Rewrite resolv.conf file generated by docker. @@ -135,17 +139,17 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id if dnsConfig := config.GetDnsConfig(); dnsConfig != nil { containerInfo, err := ds.client.InspectContainer(createResp.ID) if err != nil { - return createResp.ID, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err) + return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err) } if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil { - return createResp.ID, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err) + return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err) } } // Do not invoke network plugins if in hostNetwork mode. if nsOptions := config.GetLinux().GetSecurityContext().GetNamespaceOptions(); nsOptions != nil && nsOptions.HostNetwork { - return createResp.ID, nil + return resp, nil } // Step 5: Setup networking for the sandbox. @@ -163,7 +167,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id glog.Warningf("Failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err) } } - return createResp.ID, err + return resp, err } // StopPodSandbox stops the sandbox. If there are any running containers in the @@ -171,13 +175,17 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (id // TODO: This function blocks sandbox teardown on networking teardown. Is it // better to cut our losses assuming an out of band GC routine will cleanup // after us? -func (ds *dockerService) StopPodSandbox(podSandboxID string) error { +func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) { var namespace, name string var hostNetwork bool var checkpointErr, statusErr error + podSandboxID := r.PodSandboxId + resp := &runtimeapi.StopPodSandboxResponse{} + // Try to retrieve sandbox information from docker daemon or sandbox checkpoint - status, statusErr := ds.PodSandboxStatus(podSandboxID) + statusResp, statusErr := ds.PodSandboxStatus(ctx, &runtimeapi.PodSandboxStatusRequest{PodSandboxId: podSandboxID}) + status := statusResp.GetStatus() if statusErr == nil { nsOpts := status.GetLinux().GetNamespaces().GetOptions() hostNetwork = nsOpts != nil && nsOpts.HostNetwork @@ -196,7 +204,7 @@ func (ds *dockerService) StopPodSandbox(podSandboxID string) error { glog.Warningf("Both sandbox container and checkpoint for id %q could not be found. "+ "Proceed without further sandbox information.", podSandboxID) } else { - return utilerrors.NewAggregate([]error{ + return nil, utilerrors.NewAggregate([]error{ fmt.Errorf("failed to get checkpoint for sandbox %q: %v", podSandboxID, checkpointErr), fmt.Errorf("failed to get sandbox status: %v", statusErr)}) } @@ -237,14 +245,21 @@ func (ds *dockerService) StopPodSandbox(podSandboxID string) error { ds.checkpointHandler.RemoveCheckpoint(podSandboxID) } } - return utilerrors.NewAggregate(errList) + + if len(errList) == 0 { + return resp, nil + } + // TODO: Stop all running containers in the sandbox. + return nil, utilerrors.NewAggregate(errList) } // RemovePodSandbox removes the sandbox. If there are running containers in the // sandbox, they should be forcibly removed. -func (ds *dockerService) RemovePodSandbox(podSandboxID string) error { +func (ds *dockerService) RemovePodSandbox(ctx context.Context, r *runtimeapi.RemovePodSandboxRequest) (*runtimeapi.RemovePodSandboxResponse, error) { + podSandboxID := r.PodSandboxId var errs []error + opts := dockertypes.ContainerListOptions{All: true} opts.Filters = dockerfilters.NewArgs() @@ -258,7 +273,7 @@ func (ds *dockerService) RemovePodSandbox(podSandboxID string) error { // Remove all containers in the sandbox. for i := range containers { - if err := ds.RemoveContainer(containers[i].ID); err != nil && !libdocker.IsContainerNotFoundError(err) { + if _, err := ds.RemoveContainer(ctx, &runtimeapi.RemoveContainerRequest{ContainerId: containers[i].ID}); err != nil && !libdocker.IsContainerNotFoundError(err) { errs = append(errs, err) } } @@ -277,7 +292,10 @@ func (ds *dockerService) RemovePodSandbox(podSandboxID string) error { if err := ds.checkpointHandler.RemoveCheckpoint(podSandboxID); err != nil { errs = append(errs, err) } - return utilerrors.NewAggregate(errs) + if len(errs) == 0 { + return &runtimeapi.RemovePodSandboxResponse{}, nil + } + return nil, utilerrors.NewAggregate(errs) } // getIPFromPlugin interrogates the network plugin for an IP. @@ -342,7 +360,9 @@ func (ds *dockerService) getIP(podSandboxID string, sandbox *dockertypes.Contain } // PodSandboxStatus returns the status of the PodSandbox. -func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeapi.PodSandboxStatus, error) { +func (ds *dockerService) PodSandboxStatus(ctx context.Context, req *runtimeapi.PodSandboxStatusRequest) (*runtimeapi.PodSandboxStatusResponse, error) { + podSandboxID := req.PodSandboxId + // Inspect the container. r, err := ds.client.InspectContainer(podSandboxID) if err != nil { @@ -375,7 +395,7 @@ func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeapi.PodS return nil, err } labels, annotations := extractLabels(r.Config.Labels) - return &runtimeapi.PodSandboxStatus{ + status := &runtimeapi.PodSandboxStatus{ Id: r.ID, State: state, CreatedAt: ct, @@ -394,11 +414,14 @@ func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeapi.PodS }, }, }, - }, nil + } + return &runtimeapi.PodSandboxStatusResponse{Status: status}, nil } // ListPodSandbox returns a list of Sandbox. -func (ds *dockerService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) { +func (ds *dockerService) ListPodSandbox(_ context.Context, r *runtimeapi.ListPodSandboxRequest) (*runtimeapi.ListPodSandboxResponse, error) { + filter := r.GetFilter() + // By default, list all containers whether they are running or not. opts := dockertypes.ContainerListOptions{All: true} filterOutReadySandboxes := false @@ -482,7 +505,7 @@ func (ds *dockerService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([] result = append(result, checkpointToRuntimeAPISandbox(id, checkpoint)) } - return result, nil + return &runtimeapi.ListPodSandboxResponse{Items: result}, nil } // applySandboxLinuxOptions applies LinuxPodSandboxConfig to dockercontainer.HostConfig and dockercontainer.ContainerCreateConfig. diff --git a/pkg/kubelet/dockershim/docker_sandbox_test.go b/pkg/kubelet/dockershim/docker_sandbox_test.go index 2c30e09fa1c..77671875708 100644 --- a/pkg/kubelet/dockershim/docker_sandbox_test.go +++ b/pkg/kubelet/dockershim/docker_sandbox_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -69,23 +70,23 @@ func TestListSandboxes(t *testing.T) { state := runtimeapi.PodSandboxState_SANDBOX_READY var createdAt int64 = fakeClock.Now().UnixNano() for i := range configs { - id, err := ds.RunPodSandbox(configs[i]) - assert.NoError(t, err) + runResp, err := ds.RunPodSandbox(getTestCTX(), &runtimeapi.RunPodSandboxRequest{Config: configs[i]}) + require.NoError(t, err) // Prepend to the expected list because ListPodSandbox returns // the most recent sandbox first. expected = append([]*runtimeapi.PodSandbox{{ Metadata: configs[i].Metadata, - Id: id, + Id: runResp.PodSandboxId, State: state, CreatedAt: createdAt, Labels: configs[i].Labels, Annotations: configs[i].Annotations, }}, expected...) } - sandboxes, err := ds.ListPodSandbox(nil) - assert.NoError(t, err) - assert.Len(t, sandboxes, len(expected)) - assert.Equal(t, expected, sandboxes) + listResp, err := ds.ListPodSandbox(getTestCTX(), &runtimeapi.ListPodSandboxRequest{}) + require.NoError(t, err) + assert.Len(t, listResp.Items, len(expected)) + assert.Equal(t, expected, listResp.Items) } // TestSandboxStatus tests the basic lifecycle operations and verify that @@ -116,7 +117,9 @@ func TestSandboxStatus(t *testing.T) { // Create the sandbox. fClock.SetTime(time.Now()) expected.CreatedAt = fClock.Now().UnixNano() - id, err := ds.RunPodSandbox(config) + runResp, err := ds.RunPodSandbox(getTestCTX(), &runtimeapi.RunPodSandboxRequest{Config: config}) + require.NoError(t, err) + id := runResp.PodSandboxId // Check internal labels c, err := fDocker.InspectContainer(id) @@ -125,24 +128,25 @@ func TestSandboxStatus(t *testing.T) { assert.Equal(t, c.Config.Labels[types.KubernetesContainerNameLabel], sandboxContainerName) expected.Id = id // ID is only known after the creation. - status, err := ds.PodSandboxStatus(id) - assert.NoError(t, err) - assert.Equal(t, expected, status) + statusResp, err := ds.PodSandboxStatus(getTestCTX(), &runtimeapi.PodSandboxStatusRequest{PodSandboxId: id}) + require.NoError(t, err) + assert.Equal(t, expected, statusResp.Status) // Stop the sandbox. expected.State = runtimeapi.PodSandboxState_SANDBOX_NOTREADY - err = ds.StopPodSandbox(id) - assert.NoError(t, err) + _, err = ds.StopPodSandbox(getTestCTX(), &runtimeapi.StopPodSandboxRequest{PodSandboxId: id}) + require.NoError(t, err) // IP not valid after sandbox stop expected.Network.Ip = "" - status, err = ds.PodSandboxStatus(id) - assert.Equal(t, expected, status) + statusResp, err = ds.PodSandboxStatus(getTestCTX(), &runtimeapi.PodSandboxStatusRequest{PodSandboxId: id}) + require.NoError(t, err) + assert.Equal(t, expected, statusResp.Status) // Remove the container. - err = ds.RemovePodSandbox(id) - assert.NoError(t, err) - status, err = ds.PodSandboxStatus(id) - assert.Error(t, err, fmt.Sprintf("status of sandbox: %+v", status)) + _, err = ds.RemovePodSandbox(getTestCTX(), &runtimeapi.RemovePodSandboxRequest{PodSandboxId: id}) + require.NoError(t, err) + statusResp, err = ds.PodSandboxStatus(getTestCTX(), &runtimeapi.PodSandboxStatusRequest{PodSandboxId: id}) + assert.Error(t, err, fmt.Sprintf("status of sandbox: %+v", statusResp)) } // TestSandboxStatusAfterRestart tests that retrieving sandbox status returns @@ -183,9 +187,10 @@ func TestSandboxStatusAfterRestart(t *testing.T) { // Check status without RunPodSandbox() having set up networking expected.Id = createResp.ID // ID is only known after the creation. - status, err := ds.PodSandboxStatus(createResp.ID) - assert.NoError(t, err) - assert.Equal(t, expected, status) + + statusResp, err := ds.PodSandboxStatus(getTestCTX(), &runtimeapi.PodSandboxStatusRequest{PodSandboxId: createResp.ID}) + require.NoError(t, err) + assert.Equal(t, expected, statusResp.Status) } // TestNetworkPluginInvocation checks that the right SetUpPod and TearDownPod @@ -212,10 +217,10 @@ func TestNetworkPluginInvocation(t *testing.T) { mockPlugin.EXPECT().GetPodNetworkStatus(ns, name, cID) mockPlugin.EXPECT().TearDownPod(ns, name, cID).After(setup) - _, err := ds.RunPodSandbox(c) - assert.NoError(t, err) - err = ds.StopPodSandbox(cID.ID) - assert.NoError(t, err) + _, err := ds.RunPodSandbox(getTestCTX(), &runtimeapi.RunPodSandboxRequest{Config: c}) + require.NoError(t, err) + _, err = ds.StopPodSandbox(getTestCTX(), &runtimeapi.StopPodSandboxRequest{PodSandboxId: cID.ID}) + require.NoError(t, err) } // TestHostNetworkPluginInvocation checks that *no* SetUp/TearDown calls happen @@ -244,9 +249,11 @@ func TestHostNetworkPluginInvocation(t *testing.T) { cID := kubecontainer.ContainerID{Type: runtimeName, ID: libdocker.GetFakeContainerID(fmt.Sprintf("/%v", makeSandboxName(c)))} // No calls to network plugin are expected - _, err := ds.RunPodSandbox(c) - assert.NoError(t, err) - assert.NoError(t, ds.StopPodSandbox(cID.ID)) + _, err := ds.RunPodSandbox(getTestCTX(), &runtimeapi.RunPodSandboxRequest{Config: c}) + require.NoError(t, err) + + _, err = ds.StopPodSandbox(getTestCTX(), &runtimeapi.StopPodSandboxRequest{PodSandboxId: cID.ID}) + require.NoError(t, err) } // TestSetUpPodFailure checks that the sandbox should be not ready when it @@ -271,19 +278,19 @@ func TestSetUpPodFailure(t *testing.T) { mockPlugin.EXPECT().GetPodNetworkStatus(ns, name, cID).Return(&network.PodNetworkStatus{IP: net.IP("127.0.0.01")}, nil).AnyTimes() t.Logf("RunPodSandbox should return error") - _, err := ds.RunPodSandbox(c) + _, err := ds.RunPodSandbox(getTestCTX(), &runtimeapi.RunPodSandboxRequest{Config: c}) assert.Error(t, err) t.Logf("PodSandboxStatus should be not ready") - status, err := ds.PodSandboxStatus(cID.ID) - assert.NoError(t, err) - assert.Equal(t, runtimeapi.PodSandboxState_SANDBOX_NOTREADY, status.State) + statusResp, err := ds.PodSandboxStatus(getTestCTX(), &runtimeapi.PodSandboxStatusRequest{PodSandboxId: cID.ID}) + require.NoError(t, err) + assert.Equal(t, runtimeapi.PodSandboxState_SANDBOX_NOTREADY, statusResp.Status.State) t.Logf("ListPodSandbox should also show not ready") - sandboxes, err := ds.ListPodSandbox(nil) - assert.NoError(t, err) + listResp, err := ds.ListPodSandbox(getTestCTX(), &runtimeapi.ListPodSandboxRequest{}) + require.NoError(t, err) var sandbox *runtimeapi.PodSandbox - for _, s := range sandboxes { + for _, s := range listResp.Items { if s.Id == cID.ID { sandbox = s break diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index 6fa1106f4fd..2256ade8a3f 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -18,21 +18,16 @@ package dockershim import ( "fmt" - "io" "net/http" - "strconv" "sync" "time" - "github.com/armon/circbuf" "github.com/blang/semver" dockertypes "github.com/docker/docker/api/types" "github.com/golang/glog" + "golang.org/x/net/context" "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kubetypes "k8s.io/apimachinery/pkg/types" - internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" kubecm "k8s.io/kubernetes/pkg/kubelet/cm" @@ -86,6 +81,25 @@ const ( // to kubelet behavior and system settings in addition to any API flags that may be introduced. ) +// CRIService includes all methods necessary for a CRI server. +type CRIService interface { + runtimeapi.RuntimeServiceServer + runtimeapi.ImageServiceServer + Start() error +} + +// DockerService is an interface that embeds the new RuntimeService and +// ImageService interfaces. +type DockerService interface { + CRIService + + // For serving streaming calls. + http.Handler + + // For supporting legacy features. + DockerLegacyService +} + // NetworkPluginSettings is the subset of kubelet runtime args we pass // to the container runtime shim so it can probe for network plugins. // In the future we will feed these directly to a standalone container @@ -262,25 +276,6 @@ func NewDockerService(config *ClientConfig, podSandboxImage string, streamingCon return ds, nil } -// DockerService is an interface that embeds the new RuntimeService and -// ImageService interfaces. -type DockerService interface { - internalapi.RuntimeService - internalapi.ImageManagerService - Start() error - // For serving streaming calls. - http.Handler - - // IsCRISupportedLogDriver checks whether the logging driver used by docker is - // suppoted by native CRI integration. - // TODO(resouer): remove this when deprecating unsupported log driver - IsCRISupportedLogDriver() (bool, error) - - // NewDockerLegacyService created docker legacy service when log driver is not supported. - // TODO(resouer): remove this when deprecating unsupported log driver - NewDockerLegacyService() DockerLegacyService -} - type dockerService struct { client libdocker.Interface os kubecontainer.OSInterface @@ -309,8 +304,10 @@ type dockerService struct { disableSharedPID bool } +// TODO: handle context. + // Version returns the runtime name, runtime version and runtime API version -func (ds *dockerService) Version(_ string) (*runtimeapi.VersionResponse, error) { +func (ds *dockerService) Version(_ context.Context, r *runtimeapi.VersionRequest) (*runtimeapi.VersionResponse, error) { v, err := ds.getDockerVersion() if err != nil { return nil, err @@ -336,17 +333,20 @@ func (ds *dockerService) getDockerVersion() (*dockertypes.Version, error) { } // UpdateRuntimeConfig updates the runtime config. Currently only handles podCIDR updates. -func (ds *dockerService) UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) (err error) { +func (ds *dockerService) UpdateRuntimeConfig(_ context.Context, r *runtimeapi.UpdateRuntimeConfigRequest) (*runtimeapi.UpdateRuntimeConfigResponse, error) { + runtimeConfig := r.GetRuntimeConfig() if runtimeConfig == nil { - return + return &runtimeapi.UpdateRuntimeConfigResponse{}, nil } + glog.Infof("docker cri received runtime config %+v", runtimeConfig) if ds.network != nil && runtimeConfig.NetworkConfig.PodCidr != "" { event := make(map[string]interface{}) event[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = runtimeConfig.NetworkConfig.PodCidr ds.network.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, event) } - return + + return &runtimeapi.UpdateRuntimeConfigResponse{}, nil } // GetNetNS returns the network namespace of the given containerID. The ID @@ -392,7 +392,7 @@ func (ds *dockerService) Start() error { // Status returns the status of the runtime. // TODO(random-liu): Set network condition accordingly here. -func (ds *dockerService) Status() (*runtimeapi.RuntimeStatus, error) { +func (ds *dockerService) Status(_ context.Context, r *runtimeapi.StatusRequest) (*runtimeapi.StatusResponse, error) { runtimeReady := &runtimeapi.RuntimeCondition{ Type: runtimeapi.RuntimeReady, Status: true, @@ -412,7 +412,8 @@ func (ds *dockerService) Status() (*runtimeapi.RuntimeStatus, error) { networkReady.Reason = "NetworkPluginNotReady" networkReady.Message = fmt.Sprintf("docker: network plugin is not ready: %v", err) } - return &runtimeapi.RuntimeStatus{Conditions: conditions}, nil + status := &runtimeapi.RuntimeStatus{Conditions: conditions} + return &runtimeapi.StatusResponse{Status: status}, nil } func (ds *dockerService) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -507,103 +508,3 @@ func toAPIProtocol(protocol Protocol) v1.Protocol { glog.Warningf("Unknown protocol %q: defaulting to TCP", protocol) return v1.ProtocolTCP } - -// DockerLegacyService interface embeds some legacy methods for backward compatibility. -type DockerLegacyService interface { - // GetContainerLogs gets logs for a specific container. - GetContainerLogs(*v1.Pod, kubecontainer.ContainerID, *v1.PodLogOptions, io.Writer, io.Writer) error -} - -// dockerLegacyService implements the DockerLegacyService. We add this for non json-log driver -// support. (See #41996) -type dockerLegacyService struct { - client libdocker.Interface -} - -// NewDockerLegacyService created docker legacy service when log driver is not supported. -// TODO(resouer): remove this when deprecating unsupported log driver -func (d *dockerService) NewDockerLegacyService() DockerLegacyService { - return &dockerLegacyService{client: d.client} -} - -// GetContainerLogs get container logs directly from docker daemon. -func (d *dockerLegacyService) GetContainerLogs(pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { - container, err := d.client.InspectContainer(containerID.ID) - if err != nil { - return err - } - - var since int64 - if logOptions.SinceSeconds != nil { - t := metav1.Now().Add(-time.Duration(*logOptions.SinceSeconds) * time.Second) - since = t.Unix() - } - if logOptions.SinceTime != nil { - since = logOptions.SinceTime.Unix() - } - opts := dockertypes.ContainerLogsOptions{ - ShowStdout: true, - ShowStderr: true, - Since: strconv.FormatInt(since, 10), - Timestamps: logOptions.Timestamps, - Follow: logOptions.Follow, - } - if logOptions.TailLines != nil { - opts.Tail = strconv.FormatInt(*logOptions.TailLines, 10) - } - - sopts := libdocker.StreamOptions{ - OutputStream: stdout, - ErrorStream: stderr, - RawTerminal: container.Config.Tty, - } - return d.client.Logs(containerID.ID, opts, sopts) -} - -// LegacyLogProvider implements the kuberuntime.LegacyLogProvider interface -type LegacyLogProvider struct { - dls DockerLegacyService -} - -func NewLegacyLogProvider(dls DockerLegacyService) LegacyLogProvider { - return LegacyLogProvider{dls: dls} -} - -// GetContainerLogTail attempts to read up to MaxContainerTerminationMessageLogLength -// from the end of the log when docker is configured with a log driver other than json-log. -// It reads up to MaxContainerTerminationMessageLogLines lines. -func (l LegacyLogProvider) GetContainerLogTail(uid kubetypes.UID, name, namespace string, containerId kubecontainer.ContainerID) (string, error) { - value := int64(kubecontainer.MaxContainerTerminationMessageLogLines) - buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength) - // Although this is not a full spec pod, dockerLegacyService.GetContainerLogs() currently completely ignores its pod param - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - UID: uid, - Name: name, - Namespace: namespace, - }, - } - err := l.dls.GetContainerLogs(pod, containerId, &v1.PodLogOptions{TailLines: &value}, buf, buf) - if err != nil { - return "", err - } - return buf.String(), nil -} - -// criSupportedLogDrivers are log drivers supported by native CRI integration. -var criSupportedLogDrivers = []string{"json-file"} - -// IsCRISupportedLogDriver checks whether the logging driver used by docker is -// suppoted by native CRI integration. -func (d *dockerService) IsCRISupportedLogDriver() (bool, error) { - info, err := d.client.Info() - if err != nil { - return false, fmt.Errorf("failed to get docker info: %v", err) - } - for _, driver := range criSupportedLogDrivers { - if info.LoggingDriver == driver { - return true, nil - } - } - return false, nil -} diff --git a/pkg/kubelet/dockershim/docker_service_test.go b/pkg/kubelet/dockershim/docker_service_test.go index aa5ac6fa236..77a0f05c721 100644 --- a/pkg/kubelet/dockershim/docker_service_test.go +++ b/pkg/kubelet/dockershim/docker_service_test.go @@ -83,33 +83,33 @@ func TestStatus(t *testing.T) { } // Should report ready status if version returns no error. - status, err := ds.Status() - assert.NoError(t, err) + statusResp, err := ds.Status(getTestCTX(), &runtimeapi.StatusRequest{}) + require.NoError(t, err) assertStatus(map[string]bool{ runtimeapi.RuntimeReady: true, runtimeapi.NetworkReady: true, - }, status) + }, statusResp.Status) // Should not report ready status if version returns error. fDocker.InjectError("version", errors.New("test error")) - status, err = ds.Status() + statusResp, err = ds.Status(getTestCTX(), &runtimeapi.StatusRequest{}) assert.NoError(t, err) assertStatus(map[string]bool{ runtimeapi.RuntimeReady: false, runtimeapi.NetworkReady: true, - }, status) + }, statusResp.Status) // Should not report ready status is network plugin returns error. mockPlugin := newTestNetworkPlugin(t) ds.network = network.NewPluginManager(mockPlugin) defer mockPlugin.Finish() mockPlugin.EXPECT().Status().Return(errors.New("network error")) - status, err = ds.Status() + statusResp, err = ds.Status(getTestCTX(), &runtimeapi.StatusRequest{}) assert.NoError(t, err) assertStatus(map[string]bool{ runtimeapi.RuntimeReady: true, runtimeapi.NetworkReady: false, - }, status) + }, statusResp.Status) } func TestVersion(t *testing.T) { diff --git a/pkg/kubelet/dockershim/docker_stats_linux.go b/pkg/kubelet/dockershim/docker_stats_linux.go index 48622185f1b..e6bc458e217 100644 --- a/pkg/kubelet/dockershim/docker_stats_linux.go +++ b/pkg/kubelet/dockershim/docker_stats_linux.go @@ -20,15 +20,17 @@ package dockershim import ( "fmt" + + "golang.org/x/net/context" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // ContainerStats returns stats for a container stats request based on container id. -func (ds *dockerService) ContainerStats(string) (*runtimeapi.ContainerStats, error) { +func (ds *dockerService) ContainerStats(_ context.Context, r *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) { return nil, fmt.Errorf("not implemented") } // ListContainerStats returns stats for a list container stats request based on a filter. -func (ds *dockerService) ListContainerStats(*runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) { +func (ds *dockerService) ListContainerStats(_ context.Context, r *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) { return nil, fmt.Errorf("not implemented") } diff --git a/pkg/kubelet/dockershim/docker_stats_unsupported.go b/pkg/kubelet/dockershim/docker_stats_unsupported.go index d9f7beef5c7..009362bb51a 100644 --- a/pkg/kubelet/dockershim/docker_stats_unsupported.go +++ b/pkg/kubelet/dockershim/docker_stats_unsupported.go @@ -21,15 +21,17 @@ package dockershim import ( "fmt" + "golang.org/x/net/context" + runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // ContainerStats returns stats for a container stats request based on container id. -func (ds *dockerService) ContainerStats(string) (*runtimeapi.ContainerStats, error) { +func (ds *dockerService) ContainerStats(_ context.Context, r *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) { return nil, fmt.Errorf("not implemented") } // ListContainerStats returns stats for a list container stats request based on a filter. -func (ds *dockerService) ListContainerStats(*runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) { +func (ds *dockerService) ListContainerStats(_ context.Context, r *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) { return nil, fmt.Errorf("not implemented") } diff --git a/pkg/kubelet/dockershim/docker_stats_windows.go b/pkg/kubelet/dockershim/docker_stats_windows.go index 6f51239502e..49bd2b66716 100644 --- a/pkg/kubelet/dockershim/docker_stats_windows.go +++ b/pkg/kubelet/dockershim/docker_stats_windows.go @@ -21,20 +21,23 @@ package dockershim import ( "time" + "golang.org/x/net/context" + runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // ContainerStats returns stats for a container stats request based on container id. -func (ds *dockerService) ContainerStats(containerID string) (*runtimeapi.ContainerStats, error) { - containerStats, err := ds.getContainerStats(containerID) +func (ds *dockerService) ContainerStats(_ context.Context, r *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) { + stats, err := ds.getContainerStats(r.ContainerId) if err != nil { return nil, err } - return containerStats, nil + return &runtimeapi.ContainerStatsResponse{Stats: stats}, nil } // ListContainerStats returns stats for a list container stats request based on a filter. -func (ds *dockerService) ListContainerStats(containerStatsFilter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) { +func (ds *dockerService) ListContainerStats(ctx context.Context, r *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) { + containerStatsFilter := r.GetFilter() filter := &runtimeapi.ContainerFilter{} if containerStatsFilter != nil { @@ -43,13 +46,13 @@ func (ds *dockerService) ListContainerStats(containerStatsFilter *runtimeapi.Con filter.LabelSelector = containerStatsFilter.LabelSelector } - containers, err := ds.ListContainers(filter) + listResp, err := ds.ListContainers(ctx, &runtimeapi.ListContainersRequest{Filter: filter}) if err != nil { return nil, err } var stats []*runtimeapi.ContainerStats - for _, container := range containers { + for _, container := range listResp.Containers { containerStats, err := ds.getContainerStats(container.Id) if err != nil { return nil, err @@ -58,7 +61,7 @@ func (ds *dockerService) ListContainerStats(containerStatsFilter *runtimeapi.Con stats = append(stats, containerStats) } - return stats, nil + return &runtimeapi.ListContainerStatsResponse{Stats: stats}, nil } func (ds *dockerService) getContainerStats(containerID string) (*runtimeapi.ContainerStats, error) { @@ -72,10 +75,11 @@ func (ds *dockerService) getContainerStats(containerID string) (*runtimeapi.Cont return nil, err } - status, err := ds.ContainerStatus(containerID) + statusResp, err := ds.ContainerStatus(context.Background(), &runtimeapi.ContainerStatusRequest{ContainerId: containerID}) if err != nil { return nil, err } + status := statusResp.GetStatus() dockerStats := statsJSON.Stats timestamp := time.Now().UnixNano() diff --git a/pkg/kubelet/dockershim/docker_streaming.go b/pkg/kubelet/dockershim/docker_streaming.go index 4c4b74185ab..c55e0829cd0 100644 --- a/pkg/kubelet/dockershim/docker_streaming.go +++ b/pkg/kubelet/dockershim/docker_streaming.go @@ -26,14 +26,15 @@ import ( "time" dockertypes "github.com/docker/docker/api/types" - "github.com/golang/glog" + "golang.org/x/net/context" "k8s.io/client-go/tools/remotecommand" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/server/streaming" "k8s.io/kubernetes/pkg/kubelet/util/ioutils" + utilexec "k8s.io/utils/exec" "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" ) @@ -76,20 +77,35 @@ func (r *streamingRuntime) PortForward(podSandboxID string, port int32, stream i // ExecSync executes a command in the container, and returns the stdout output. // If command exits with a non-zero exit code, an error is returned. -func (ds *dockerService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) { +func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncRequest) (*runtimeapi.ExecSyncResponse, error) { + timeout := time.Duration(req.Timeout) * time.Second var stdoutBuffer, stderrBuffer bytes.Buffer - err = ds.streamingRuntime.exec(containerID, cmd, + err := ds.streamingRuntime.exec(req.ContainerId, req.Cmd, nil, // in ioutils.WriteCloserWrapper(&stdoutBuffer), ioutils.WriteCloserWrapper(&stderrBuffer), false, // tty nil, // resize timeout) - return stdoutBuffer.Bytes(), stderrBuffer.Bytes(), err + + var exitCode int32 + if err != nil { + exitError, ok := err.(utilexec.ExitError) + if !ok { + return nil, err + } + + exitCode = int32(exitError.ExitStatus()) + } + return &runtimeapi.ExecSyncResponse{ + Stdout: stdoutBuffer.Bytes(), + Stderr: stderrBuffer.Bytes(), + ExitCode: exitCode, + }, nil } // Exec prepares a streaming endpoint to execute a command in the container, and returns the address. -func (ds *dockerService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { +func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { if ds.streamingServer == nil { return nil, streaming.ErrorStreamingDisabled("exec") } @@ -101,7 +117,7 @@ func (ds *dockerService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResp } // Attach prepares a streaming endpoint to attach to a running container, and returns the address. -func (ds *dockerService) Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) { +func (ds *dockerService) Attach(_ context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) { if ds.streamingServer == nil { return nil, streaming.ErrorStreamingDisabled("attach") } @@ -113,7 +129,7 @@ func (ds *dockerService) Attach(req *runtimeapi.AttachRequest) (*runtimeapi.Atta } // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. -func (ds *dockerService) PortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) { +func (ds *dockerService) PortForward(_ context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) { if ds.streamingServer == nil { return nil, streaming.ErrorStreamingDisabled("port forward") } diff --git a/pkg/kubelet/dockershim/remote/BUILD b/pkg/kubelet/dockershim/remote/BUILD index 2d320253698..b44e4d39d66 100644 --- a/pkg/kubelet/dockershim/remote/BUILD +++ b/pkg/kubelet/dockershim/remote/BUILD @@ -7,21 +7,15 @@ load( go_library( name = "go_default_library", - srcs = [ - "docker_server.go", - "docker_service.go", - ], + srcs = ["docker_server.go"], importpath = "k8s.io/kubernetes/pkg/kubelet/dockershim/remote", deps = [ - "//pkg/kubelet/apis/cri:go_default_library", "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", "//pkg/kubelet/dockershim:go_default_library", "//pkg/kubelet/util:go_default_library", "//pkg/util/interrupt:go_default_library", "//vendor/github.com/golang/glog:go_default_library", - "//vendor/golang.org/x/net/context:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", - "//vendor/k8s.io/utils/exec:go_default_library", ], ) diff --git a/pkg/kubelet/dockershim/remote/docker_server.go b/pkg/kubelet/dockershim/remote/docker_server.go index 21865e3e2c1..b1185005602 100644 --- a/pkg/kubelet/dockershim/remote/docker_server.go +++ b/pkg/kubelet/dockershim/remote/docker_server.go @@ -33,21 +33,27 @@ type DockerServer struct { // endpoint is the endpoint to serve on. endpoint string // service is the docker service which implements runtime and image services. - service DockerService + service dockershim.CRIService // server is the grpc server. server *grpc.Server } // NewDockerServer creates the dockershim grpc server. -func NewDockerServer(endpoint string, s dockershim.DockerService) *DockerServer { +func NewDockerServer(endpoint string, s dockershim.CRIService) *DockerServer { return &DockerServer{ endpoint: endpoint, - service: NewDockerService(s), + service: s, } } // Start starts the dockershim grpc server. func (s *DockerServer) Start() error { + // Start the internal service. + if err := s.service.Start(); err != nil { + glog.Errorf("Unable to start docker service") + return err + } + glog.V(2).Infof("Start dockershim grpc server") l, err := util.CreateListener(s.endpoint) if err != nil { diff --git a/pkg/kubelet/dockershim/remote/docker_service.go b/pkg/kubelet/dockershim/remote/docker_service.go deleted file mode 100644 index 2d85f179000..00000000000 --- a/pkg/kubelet/dockershim/remote/docker_service.go +++ /dev/null @@ -1,249 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package remote - -import ( - "time" - - "golang.org/x/net/context" - - internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" - runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" - "k8s.io/kubernetes/pkg/kubelet/dockershim" - utilexec "k8s.io/utils/exec" -) - -// DockerService is the interface implement CRI remote service server. -type DockerService interface { - runtimeapi.RuntimeServiceServer - runtimeapi.ImageServiceServer -} - -// dockerService uses dockershim service to implement DockerService. -// Notice that the contexts in the functions are not used now. -// TODO(random-liu): Change the dockershim service to support context, and implement -// internal services and remote services with the dockershim service. -type dockerService struct { - runtimeService internalapi.RuntimeService - imageService internalapi.ImageManagerService -} - -func NewDockerService(s dockershim.DockerService) DockerService { - return &dockerService{runtimeService: s, imageService: s} -} - -func (d *dockerService) Version(ctx context.Context, r *runtimeapi.VersionRequest) (*runtimeapi.VersionResponse, error) { - return d.runtimeService.Version(r.Version) -} - -func (d *dockerService) Status(ctx context.Context, r *runtimeapi.StatusRequest) (*runtimeapi.StatusResponse, error) { - status, err := d.runtimeService.Status() - if err != nil { - return nil, err - } - return &runtimeapi.StatusResponse{Status: status}, nil -} - -func (d *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) { - podSandboxId, err := d.runtimeService.RunPodSandbox(r.GetConfig()) - if err != nil { - return nil, err - } - return &runtimeapi.RunPodSandboxResponse{PodSandboxId: podSandboxId}, nil -} - -func (d *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) { - err := d.runtimeService.StopPodSandbox(r.PodSandboxId) - if err != nil { - return nil, err - } - return &runtimeapi.StopPodSandboxResponse{}, nil -} - -func (d *dockerService) RemovePodSandbox(ctx context.Context, r *runtimeapi.RemovePodSandboxRequest) (*runtimeapi.RemovePodSandboxResponse, error) { - err := d.runtimeService.RemovePodSandbox(r.PodSandboxId) - if err != nil { - return nil, err - } - return &runtimeapi.RemovePodSandboxResponse{}, nil -} - -func (d *dockerService) PodSandboxStatus(ctx context.Context, r *runtimeapi.PodSandboxStatusRequest) (*runtimeapi.PodSandboxStatusResponse, error) { - podSandboxStatus, err := d.runtimeService.PodSandboxStatus(r.PodSandboxId) - if err != nil { - return nil, err - } - return &runtimeapi.PodSandboxStatusResponse{Status: podSandboxStatus}, nil -} - -func (d *dockerService) ListPodSandbox(ctx context.Context, r *runtimeapi.ListPodSandboxRequest) (*runtimeapi.ListPodSandboxResponse, error) { - items, err := d.runtimeService.ListPodSandbox(r.GetFilter()) - if err != nil { - return nil, err - } - return &runtimeapi.ListPodSandboxResponse{Items: items}, nil -} - -func (d *dockerService) CreateContainer(ctx context.Context, r *runtimeapi.CreateContainerRequest) (*runtimeapi.CreateContainerResponse, error) { - containerId, err := d.runtimeService.CreateContainer(r.PodSandboxId, r.GetConfig(), r.GetSandboxConfig()) - if err != nil { - return nil, err - } - return &runtimeapi.CreateContainerResponse{ContainerId: containerId}, nil -} - -func (d *dockerService) StartContainer(ctx context.Context, r *runtimeapi.StartContainerRequest) (*runtimeapi.StartContainerResponse, error) { - err := d.runtimeService.StartContainer(r.ContainerId) - if err != nil { - return nil, err - } - return &runtimeapi.StartContainerResponse{}, nil -} - -func (d *dockerService) StopContainer(ctx context.Context, r *runtimeapi.StopContainerRequest) (*runtimeapi.StopContainerResponse, error) { - err := d.runtimeService.StopContainer(r.ContainerId, r.Timeout) - if err != nil { - return nil, err - } - return &runtimeapi.StopContainerResponse{}, nil -} - -func (d *dockerService) RemoveContainer(ctx context.Context, r *runtimeapi.RemoveContainerRequest) (*runtimeapi.RemoveContainerResponse, error) { - err := d.runtimeService.RemoveContainer(r.ContainerId) - if err != nil { - return nil, err - } - return &runtimeapi.RemoveContainerResponse{}, nil -} - -func (d *dockerService) ListContainers(ctx context.Context, r *runtimeapi.ListContainersRequest) (*runtimeapi.ListContainersResponse, error) { - containers, err := d.runtimeService.ListContainers(r.GetFilter()) - if err != nil { - return nil, err - } - return &runtimeapi.ListContainersResponse{Containers: containers}, nil -} - -func (d *dockerService) ContainerStatus(ctx context.Context, r *runtimeapi.ContainerStatusRequest) (*runtimeapi.ContainerStatusResponse, error) { - status, err := d.runtimeService.ContainerStatus(r.ContainerId) - if err != nil { - return nil, err - } - return &runtimeapi.ContainerStatusResponse{Status: status}, nil -} - -func (d *dockerService) UpdateContainerResources(ctx context.Context, r *runtimeapi.UpdateContainerResourcesRequest) (*runtimeapi.UpdateContainerResourcesResponse, error) { - err := d.runtimeService.UpdateContainerResources(r.ContainerId, r.Linux) - if err != nil { - return nil, err - } - return &runtimeapi.UpdateContainerResourcesResponse{}, nil -} - -func (d *dockerService) ExecSync(ctx context.Context, r *runtimeapi.ExecSyncRequest) (*runtimeapi.ExecSyncResponse, error) { - stdout, stderr, err := d.runtimeService.ExecSync(r.ContainerId, r.Cmd, time.Duration(r.Timeout)*time.Second) - var exitCode int32 - if err != nil { - exitError, ok := err.(utilexec.ExitError) - if !ok { - return nil, err - } - exitCode = int32(exitError.ExitStatus()) - } - return &runtimeapi.ExecSyncResponse{ - Stdout: stdout, - Stderr: stderr, - ExitCode: exitCode, - }, nil -} - -func (d *dockerService) Exec(ctx context.Context, r *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { - return d.runtimeService.Exec(r) -} - -func (d *dockerService) Attach(ctx context.Context, r *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) { - return d.runtimeService.Attach(r) -} - -func (d *dockerService) PortForward(ctx context.Context, r *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) { - return d.runtimeService.PortForward(r) -} - -func (d *dockerService) UpdateRuntimeConfig(ctx context.Context, r *runtimeapi.UpdateRuntimeConfigRequest) (*runtimeapi.UpdateRuntimeConfigResponse, error) { - err := d.runtimeService.UpdateRuntimeConfig(r.GetRuntimeConfig()) - if err != nil { - return nil, err - } - return &runtimeapi.UpdateRuntimeConfigResponse{}, nil -} - -func (d *dockerService) ListImages(ctx context.Context, r *runtimeapi.ListImagesRequest) (*runtimeapi.ListImagesResponse, error) { - images, err := d.imageService.ListImages(r.GetFilter()) - if err != nil { - return nil, err - } - return &runtimeapi.ListImagesResponse{Images: images}, nil -} - -func (d *dockerService) ImageStatus(ctx context.Context, r *runtimeapi.ImageStatusRequest) (*runtimeapi.ImageStatusResponse, error) { - image, err := d.imageService.ImageStatus(r.GetImage()) - if err != nil { - return nil, err - } - return &runtimeapi.ImageStatusResponse{Image: image}, nil -} - -func (d *dockerService) PullImage(ctx context.Context, r *runtimeapi.PullImageRequest) (*runtimeapi.PullImageResponse, error) { - image, err := d.imageService.PullImage(r.GetImage(), r.GetAuth()) - if err != nil { - return nil, err - } - return &runtimeapi.PullImageResponse{ImageRef: image}, nil -} - -func (d *dockerService) RemoveImage(ctx context.Context, r *runtimeapi.RemoveImageRequest) (*runtimeapi.RemoveImageResponse, error) { - err := d.imageService.RemoveImage(r.GetImage()) - if err != nil { - return nil, err - } - return &runtimeapi.RemoveImageResponse{}, nil -} - -// ImageFsInfo returns information of the filesystem that is used to store images. -func (d *dockerService) ImageFsInfo(ctx context.Context, r *runtimeapi.ImageFsInfoRequest) (*runtimeapi.ImageFsInfoResponse, error) { - filesystems, err := d.imageService.ImageFsInfo() - if err != nil { - return nil, err - } - return &runtimeapi.ImageFsInfoResponse{ImageFilesystems: filesystems}, nil -} - -func (d *dockerService) ContainerStats(ctx context.Context, r *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) { - stats, err := d.runtimeService.ContainerStats(r.ContainerId) - if err != nil { - return nil, err - } - return &runtimeapi.ContainerStatsResponse{Stats: stats}, nil -} - -func (d *dockerService) ListContainerStats(ctx context.Context, r *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) { - stats, err := d.runtimeService.ListContainerStats(r.GetFilter()) - if err != nil { - return nil, err - } - return &runtimeapi.ListContainerStatsResponse{Stats: stats}, nil -} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0368ecc8ece..e71ae01e744 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -610,9 +610,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, if err != nil { return nil, err } - if err := ds.Start(); err != nil { - return nil, err - } // For now, the CRI shim redirects the streaming requests to the // kubelet, which handles the requests using DockerService.. klet.criHandler = ds @@ -633,8 +630,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, return nil, err } if !supported { - klet.dockerLegacyService = ds.NewDockerLegacyService() - legacyLogProvider = dockershim.NewLegacyLogProvider(klet.dockerLegacyService) + klet.dockerLegacyService = ds + legacyLogProvider = ds } case kubetypes.RemoteContainerRuntime: // No-op.