diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index 3c1c0ab367f..e37ad98ae1f 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -195,7 +195,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.StringSliceVar(&s.AllowedUnsafeSysctls, "experimental-allowed-unsafe-sysctls", s.AllowedUnsafeSysctls, "Comma-separated whitelist of unsafe sysctls or unsafe sysctl patterns (ending in *). Use these at your own risk.") // Flags intended for testing, not recommended used in production environments. - fs.StringVar(&s.RemoteRuntimeEndpoint, "container-runtime-endpoint", s.RemoteRuntimeEndpoint, "The unix socket endpoint of remote runtime service. If not empty, this option will override --container-runtime. This is an experimental feature. Intended for testing only.") + fs.StringVar(&s.RemoteRuntimeEndpoint, "container-runtime-endpoint", s.RemoteRuntimeEndpoint, "The unix socket endpoint of remote runtime service. This is an experimental feature. Intended for testing only.") fs.StringVar(&s.RemoteImageEndpoint, "image-service-endpoint", s.RemoteImageEndpoint, "The unix socket endpoint of remote image service. If not specified, it will be the same with container-runtime-endpoint by default. This is an experimental feature. Intended for testing only.") fs.BoolVar(&s.ReallyCrashForTesting, "really-crash-for-testing", s.ReallyCrashForTesting, "If true, when panics occur crash. Intended for testing.") fs.Float64Var(&s.ChaosChance, "chaos-chance", s.ChaosChance, "If > 0.0, introduce random client errors and latency. Intended for testing. [default=0.0]") diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index 832bd01dac2..d04f69ac49d 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -55,7 +55,7 @@ const ( var internalLabelKeys []string = []string{containerTypeLabelKey, containerLogPathLabelKey, sandboxIDLabelKey} // NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process. -func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string) DockerLegacyService { +func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string) DockerService { return &dockerService{ seccompProfileRoot: seccompProfileRoot, client: dockertools.NewInstrumentedDockerInterface(client), @@ -64,13 +64,18 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str } } -// DockerLegacyService is an interface that embeds both the new -// RuntimeService and ImageService interfaces, while including legacy methods -// for backward compatibility. -type DockerLegacyService interface { +// DockerService is an interface that embeds both the new RuntimeService and +// ImageService interfaces, while including DockerLegacyService for backward +// compatibility. +type DockerService interface { internalApi.RuntimeService internalApi.ImageManagerService + DockerLegacyService +} +// DockerLegacyService is an interface that embeds all legacy methods for +// backward compatibility. +type DockerLegacyService interface { // Supporting legacy methods for docker. GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) kubecontainer.ContainerAttacher diff --git a/pkg/kubelet/dockershim/remote/docker_server.go b/pkg/kubelet/dockershim/remote/docker_server.go new file mode 100644 index 00000000000..2120926c147 --- /dev/null +++ b/pkg/kubelet/dockershim/remote/docker_server.go @@ -0,0 +1,89 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "fmt" + "net" + "os" + "syscall" + + "github.com/golang/glog" + "google.golang.org/grpc" + + runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + "k8s.io/kubernetes/pkg/kubelet/dockershim" + "k8s.io/kubernetes/pkg/util/interrupt" +) + +const ( + // defaultEndpoint is the default address of dockershim grpc server socket. + defaultAddress = "/var/run/dockershim.sock" + // unixProtocol is the network protocol of unix socket. + unixProtocol = "unix" +) + +// DockerServer is the grpc server of dockershim. +type DockerServer struct { + // addr is the address to serve on. + addr string + // service is the docker service which implements runtime and image services. + service DockerService + // server is the grpc server. + server *grpc.Server +} + +// NewDockerServer creates the dockershim grpc server. +func NewDockerServer(addr string, s dockershim.DockerService) *DockerServer { + return &DockerServer{ + addr: addr, + service: NewDockerService(s), + } +} + +// Start starts the dockershim grpc server. +func (s *DockerServer) Start() error { + glog.V(2).Infof("Start dockershim grpc server") + // Unlink to cleanup the previous socket file. + err := syscall.Unlink(s.addr) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to unlink socket file %q: %v", s.addr, err) + } + l, err := net.Listen(unixProtocol, s.addr) + if err != nil { + return fmt.Errorf("failed to listen on %q: %v", s.addr, err) + } + // Create the grpc server and register runtime and image services. + s.server = grpc.NewServer() + runtimeApi.RegisterRuntimeServiceServer(s.server, s.service) + runtimeApi.RegisterImageServiceServer(s.server, s.service) + go func() { + // Use interrupt handler to make sure the server to be stopped properly. + h := interrupt.New(nil, s.Stop) + err := h.Run(func() error { return s.server.Serve(l) }) + if err != nil { + glog.Errorf("Failed to serve connections: %v", err) + } + }() + return nil +} + +// Stop stops the dockershim grpc server. +func (s *DockerServer) Stop() { + glog.V(2).Infof("Stop docker server") + s.server.Stop() +} diff --git a/pkg/kubelet/dockershim/remote/docker_service.go b/pkg/kubelet/dockershim/remote/docker_service.go new file mode 100644 index 00000000000..03ce2ff796e --- /dev/null +++ b/pkg/kubelet/dockershim/remote/docker_service.go @@ -0,0 +1,194 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "fmt" + + "golang.org/x/net/context" + + internalApi "k8s.io/kubernetes/pkg/kubelet/api" + runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + "k8s.io/kubernetes/pkg/kubelet/dockershim" +) + +// DockerService is the interface implement CRI remote service server. +type DockerService interface { + runtimeApi.RuntimeServiceServer + runtimeApi.ImageServiceServer +} + +// dockerService uses dockershim service to implement DockerService. +// Notice that the contexts in the functions are not used now. +// TODO(random-liu): Change the dockershim service to support context, and implement +// internal services and remote services with the dockershim service. +type dockerService struct { + runtimeService internalApi.RuntimeService + imageService internalApi.ImageManagerService +} + +func NewDockerService(s dockershim.DockerService) DockerService { + return &dockerService{runtimeService: s, imageService: s} +} + +func (d *dockerService) Version(ctx context.Context, r *runtimeApi.VersionRequest) (*runtimeApi.VersionResponse, error) { + return d.runtimeService.Version(r.GetVersion()) +} + +func (d *dockerService) RunPodSandbox(ctx context.Context, r *runtimeApi.RunPodSandboxRequest) (*runtimeApi.RunPodSandboxResponse, error) { + podSandboxId, err := d.runtimeService.RunPodSandbox(r.GetConfig()) + if err != nil { + return nil, err + } + return &runtimeApi.RunPodSandboxResponse{PodSandboxId: &podSandboxId}, nil +} + +func (d *dockerService) StopPodSandbox(ctx context.Context, r *runtimeApi.StopPodSandboxRequest) (*runtimeApi.StopPodSandboxResponse, error) { + err := d.runtimeService.StopPodSandbox(r.GetPodSandboxId()) + if err != nil { + return nil, err + } + return &runtimeApi.StopPodSandboxResponse{}, nil +} + +func (d *dockerService) RemovePodSandbox(ctx context.Context, r *runtimeApi.RemovePodSandboxRequest) (*runtimeApi.RemovePodSandboxResponse, error) { + err := d.runtimeService.RemovePodSandbox(r.GetPodSandboxId()) + if err != nil { + return nil, err + } + return &runtimeApi.RemovePodSandboxResponse{}, nil +} + +func (d *dockerService) PodSandboxStatus(ctx context.Context, r *runtimeApi.PodSandboxStatusRequest) (*runtimeApi.PodSandboxStatusResponse, error) { + podSandboxStatus, err := d.runtimeService.PodSandboxStatus(r.GetPodSandboxId()) + if err != nil { + return nil, err + } + return &runtimeApi.PodSandboxStatusResponse{Status: podSandboxStatus}, nil +} + +func (d *dockerService) ListPodSandbox(ctx context.Context, r *runtimeApi.ListPodSandboxRequest) (*runtimeApi.ListPodSandboxResponse, error) { + items, err := d.runtimeService.ListPodSandbox(r.GetFilter()) + if err != nil { + return nil, err + } + return &runtimeApi.ListPodSandboxResponse{Items: items}, nil +} + +func (d *dockerService) CreateContainer(ctx context.Context, r *runtimeApi.CreateContainerRequest) (*runtimeApi.CreateContainerResponse, error) { + containerId, err := d.runtimeService.CreateContainer(r.GetPodSandboxId(), r.GetConfig(), r.GetSandboxConfig()) + if err != nil { + return nil, err + } + return &runtimeApi.CreateContainerResponse{ContainerId: &containerId}, nil +} + +func (d *dockerService) StartContainer(ctx context.Context, r *runtimeApi.StartContainerRequest) (*runtimeApi.StartContainerResponse, error) { + err := d.runtimeService.StartContainer(r.GetContainerId()) + if err != nil { + return nil, err + } + return &runtimeApi.StartContainerResponse{}, nil +} + +func (d *dockerService) StopContainer(ctx context.Context, r *runtimeApi.StopContainerRequest) (*runtimeApi.StopContainerResponse, error) { + err := d.runtimeService.StopContainer(r.GetContainerId(), r.GetTimeout()) + if err != nil { + return nil, err + } + return &runtimeApi.StopContainerResponse{}, nil +} + +func (d *dockerService) RemoveContainer(ctx context.Context, r *runtimeApi.RemoveContainerRequest) (*runtimeApi.RemoveContainerResponse, error) { + err := d.runtimeService.RemoveContainer(r.GetContainerId()) + if err != nil { + return nil, err + } + return &runtimeApi.RemoveContainerResponse{}, nil +} + +func (d *dockerService) ListContainers(ctx context.Context, r *runtimeApi.ListContainersRequest) (*runtimeApi.ListContainersResponse, error) { + containers, err := d.runtimeService.ListContainers(r.GetFilter()) + if err != nil { + return nil, err + } + return &runtimeApi.ListContainersResponse{Containers: containers}, nil +} + +func (d *dockerService) ContainerStatus(ctx context.Context, r *runtimeApi.ContainerStatusRequest) (*runtimeApi.ContainerStatusResponse, error) { + status, err := d.runtimeService.ContainerStatus(r.GetContainerId()) + if err != nil { + return nil, err + } + return &runtimeApi.ContainerStatusResponse{Status: status}, nil +} + +func (d *dockerService) ExecSync(ctx context.Context, r *runtimeApi.ExecSyncRequest) (*runtimeApi.ExecSyncResponse, error) { + return nil, fmt.Errorf("not implemented") +} + +func (d *dockerService) Exec(ctx context.Context, r *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) { + return nil, fmt.Errorf("not implemented") +} + +func (d *dockerService) Attach(ctx context.Context, r *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) { + return nil, fmt.Errorf("not implemented") +} + +func (d *dockerService) PortForward(ctx context.Context, r *runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) { + return nil, fmt.Errorf("not implemented") +} + +func (d *dockerService) UpdateRuntimeConfig(ctx context.Context, r *runtimeApi.UpdateRuntimeConfigRequest) (*runtimeApi.UpdateRuntimeConfigResponse, error) { + err := d.runtimeService.UpdateRuntimeConfig(r.GetRuntimeConfig()) + if err != nil { + return nil, err + } + return &runtimeApi.UpdateRuntimeConfigResponse{}, nil +} + +func (d *dockerService) ListImages(ctx context.Context, r *runtimeApi.ListImagesRequest) (*runtimeApi.ListImagesResponse, error) { + images, err := d.imageService.ListImages(r.GetFilter()) + if err != nil { + return nil, err + } + return &runtimeApi.ListImagesResponse{Images: images}, nil +} + +func (d *dockerService) ImageStatus(ctx context.Context, r *runtimeApi.ImageStatusRequest) (*runtimeApi.ImageStatusResponse, error) { + image, err := d.imageService.ImageStatus(r.GetImage()) + if err != nil { + return nil, err + } + return &runtimeApi.ImageStatusResponse{Image: image}, nil +} + +func (d *dockerService) PullImage(ctx context.Context, r *runtimeApi.PullImageRequest) (*runtimeApi.PullImageResponse, error) { + err := d.imageService.PullImage(r.GetImage(), r.GetAuth()) + if err != nil { + return nil, err + } + return &runtimeApi.PullImageResponse{}, nil +} + +func (d *dockerService) RemoveImage(ctx context.Context, r *runtimeApi.RemoveImageRequest) (*runtimeApi.RemoveImageResponse, error) { + err := d.imageService.RemoveImage(r.GetImage()) + if err != nil { + return nil, err + } + return &runtimeApi.RemoveImageResponse{}, nil +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e6269f04c42..5246a55e0f8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -39,11 +39,13 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/fields" + internalApi "k8s.io/kubernetes/pkg/kubelet/api" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockershim" + dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote" "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/eviction" @@ -475,8 +477,6 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient)) if kubeCfg.RemoteRuntimeEndpoint != "" { - kubeCfg.ContainerRuntime = "remote" - // kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified if kubeCfg.RemoteImageEndpoint == "" { kubeCfg.RemoteImageEndpoint = kubeCfg.RemoteRuntimeEndpoint @@ -488,7 +488,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub case "docker": switch kubeCfg.ExperimentalRuntimeIntegrationType { case "cri": - // Use the new CRI shim for docker. This is need for testing the + // Use the new CRI shim for docker. This is needed for testing the // docker integration through CRI, and may be removed in the future. dockerService := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage) klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager( @@ -512,6 +512,53 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub if err != nil { return nil, err } + case "remote": + // kubelet will talk to the shim over a unix socket using grpc. This may become the default in the near future. + dockerService := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage) + // Start the in process dockershim grpc server. + server := dockerremote.NewDockerServer(kubeCfg.RemoteRuntimeEndpoint, dockerService) + if err := server.Start(); err != nil { + return nil, err + } + // Start the remote kuberuntime manager. + remoteRuntimeService, err := remote.NewRemoteRuntimeService(kubeCfg.RemoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration) + if err != nil { + return nil, err + } + remoteImageService, err := remote.NewRemoteImageService(kubeCfg.RemoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration) + if err != nil { + return nil, err + } + klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager( + kubecontainer.FilterEventRecorder(kubeDeps.Recorder), + klet.livenessManager, + containerRefManager, + machineInfo, + klet.podManager, + kubeDeps.OSInterface, + klet.networkPlugin, + klet, + klet.httpClient, + imageBackOff, + kubeCfg.SerializeImagePulls, + float32(kubeCfg.RegistryPullQPS), + int(kubeCfg.RegistryBurst), + klet.cpuCFSQuota, + // Use DockerLegacyService directly to workaround unimplemented functions. + // We add short hack here to keep other code clean. + // TODO: Remove this hack after CRI is fully designed and implemented. + &struct { + internalApi.RuntimeService + dockershim.DockerLegacyService + }{ + RuntimeService: remoteRuntimeService, + DockerLegacyService: dockerService, + }, + remoteImageService, + ) + if err != nil { + return nil, err + } default: // Only supported one for now, continue. klet.containerRuntime = dockertools.NewDockerManager( diff --git a/pkg/kubelet/remote/remote_image.go b/pkg/kubelet/remote/remote_image.go index 7ba51c49eb6..ed4e3a393db 100644 --- a/pkg/kubelet/remote/remote_image.go +++ b/pkg/kubelet/remote/remote_image.go @@ -34,7 +34,7 @@ type RemoteImageService struct { // NewRemoteImageService creates a new internalApi.ImageManagerService. func NewRemoteImageService(addr string, connectionTimout time.Duration) (internalApi.ImageManagerService, error) { glog.V(3).Infof("Connecting to image service %s", addr) - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithDialer(dial)) + conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(connectionTimout), grpc.WithDialer(dial)) if err != nil { glog.Errorf("Connect remote image service %s failed: %v", addr, err) return nil, err diff --git a/pkg/kubelet/remote/remote_runtime.go b/pkg/kubelet/remote/remote_runtime.go index 7bebcf8ed2e..e9ec276702b 100644 --- a/pkg/kubelet/remote/remote_runtime.go +++ b/pkg/kubelet/remote/remote_runtime.go @@ -36,7 +36,7 @@ type RemoteRuntimeService struct { // NewRemoteRuntimeService creates a new internalApi.RuntimeService. func NewRemoteRuntimeService(addr string, connectionTimout time.Duration) (internalApi.RuntimeService, error) { glog.V(3).Infof("Connecting to runtime service %s", addr) - conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithDialer(dial)) + conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(connectionTimout), grpc.WithDialer(dial)) if err != nil { glog.Errorf("Connect remote runtime %s failed: %v", addr, err) return nil, err diff --git a/test/e2e/framework/test_context.go b/test/e2e/framework/test_context.go index 4f4689ced4f..2334bf9aa46 100644 --- a/test/e2e/framework/test_context.go +++ b/test/e2e/framework/test_context.go @@ -114,6 +114,9 @@ type NodeTestContextType struct { PrepullImages bool // RuntimeIntegrationType indicates how runtime is integrated with Kubelet. This is mainly used for CRI validation test. RuntimeIntegrationType string + // ContainerRuntimeEndpoint is the endpoint of remote container runtime grpc server. This is mainly used for Remote CRI + // validation test. + ContainerRuntimeEndpoint string // MounterPath is the path to the program to run to perform a mount MounterPath string } @@ -211,6 +214,7 @@ func RegisterNodeFlags() { flag.StringVar(&TestContext.ManifestPath, "manifest-path", "", "The path to the static pod manifest file.") flag.BoolVar(&TestContext.PrepullImages, "prepull-images", true, "If true, prepull images so image pull failures do not cause test failures.") flag.StringVar(&TestContext.RuntimeIntegrationType, "runtime-integration-type", "", "Choose the integration path for the container runtime, mainly used for CRI validation.") + flag.StringVar(&TestContext.ContainerRuntimeEndpoint, "container-runtime-endpoint", "", "The endpoint of remote container runtime grpc server, mainly used for Remote CRI validation.") flag.StringVar(&TestContext.MounterPath, "mounter-path", "", "Path of mounter binary. Leave empty to use the default mount.") } diff --git a/test/e2e_node/services/services.go b/test/e2e_node/services/services.go index abe475c6246..14e6ab310a4 100644 --- a/test/e2e_node/services/services.go +++ b/test/e2e_node/services/services.go @@ -218,6 +218,9 @@ func (e *E2EServices) startKubelet() (*server, error) { cmdArgs = append(cmdArgs, "--experimental-runtime-integration-type", framework.TestContext.RuntimeIntegrationType) // Whether to use experimental cri integration. } + if framework.TestContext.ContainerRuntimeEndpoint != "" { + cmdArgs = append(cmdArgs, "--container-runtime-endpoint", framework.TestContext.ContainerRuntimeEndpoint) + } if framework.TestContext.CgroupsPerQOS { // TODO: enable this when the flag is stable and available in kubelet. // cmdArgs = append(cmdArgs,