diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3ddf5edf1ba..6c807c43c3f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -253,6 +253,18 @@ func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps return cfg, nil } +func getRuntimeAndImageServices(config *componentconfig.KubeletConfiguration) (internalApi.RuntimeService, internalApi.ImageManagerService, error) { + rs, err := remote.NewRemoteRuntimeService(config.RemoteRuntimeEndpoint, config.RuntimeRequestTimeout.Duration) + if err != nil { + return nil, nil, err + } + is, err := remote.NewRemoteImageService(config.RemoteImageEndpoint, config.RuntimeRequestTimeout.Duration) + if err != nil { + return nil, nil, err + } + return rs, is, err +} + // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules. // No initialization of Kubelet and its modules should happen here. func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) { @@ -502,90 +514,101 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub var nl *noOpLegacyHost pluginSettings.LegacyRuntimeHost = nl - // Initialize the runtime. - switch kubeCfg.ContainerRuntime { - case "docker": - switch kubeCfg.ExperimentalRuntimeIntegrationType { - case "cri": - // Use the new CRI shim for docker. This is needed for testing the - // docker integration through CRI, and may be removed in the future. - dockerService, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil, &pluginSettings, kubeCfg.RuntimeCgroups) + if kubeCfg.ExperimentalRuntimeIntegrationType == "cri" { + // kubelet defers to the runtime shim to setup networking. Setting + // this to nil will prevent it from trying to invoke the plugin. + // It's easier to always probe and initialize plugins till cri + // becomes the default. + klet.networkPlugin = nil + + var runtimeService internalApi.RuntimeService + var imageService internalApi.ImageManagerService + var err error + + switch kubeCfg.ContainerRuntime { + case "docker": + // Use the new CRI shim for docker. + ds, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil, &pluginSettings, kubeCfg.RuntimeCgroups) if err != nil { return nil, err } - runtimeService := dockerService.(internalApi.RuntimeService) - imageService := dockerService.(internalApi.ImageManagerService) + // TODO: Once we switch to grpc completely, we should move this + // call to the grpc server start. + if err := ds.Start(); err != nil { + return nil, err + } - // This is a temporary knob to easily switch between grpc and non-grpc integration. grpc - // will be enabled if this is not empty. - // TODO(random-liu): Remove the temporary knob after grpc integration is stabilized and - // pass the runtime endpoint through kubelet flags. - remoteEndpoint := "/var/run/dockershim.sock" + rs := ds.(internalApi.RuntimeService) + is := ds.(internalApi.ImageManagerService) + // This is an internal knob to switch between grpc and non-grpc + // integration. + // TODO: Remove this knob once we switch to using GRPC completely. + overGRPC := true + if overGRPC { + const ( + // The unix socket for kubelet <-> dockershim communication. + ep = "/var/run/dockershim.sock" + ) + kubeCfg.RemoteRuntimeEndpoint = ep + kubeCfg.RemoteImageEndpoint = ep - // If the remote runtime endpoint is set, use the grpc integration. - if remoteEndpoint != "" { - // Start the in process dockershim grpc server. - server := dockerremote.NewDockerServer(remoteEndpoint, dockerService) + server := dockerremote.NewDockerServer(ep, ds) + glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.") err := server.Start() if err != nil { return nil, err } - // Start the remote kuberuntime manager. - runtimeService, err = remote.NewRemoteRuntimeService(remoteEndpoint, kubeCfg.RuntimeRequestTimeout.Duration) - if err != nil { - return nil, err - } - imageService, err = remote.NewRemoteImageService(remoteEndpoint, kubeCfg.RuntimeRequestTimeout.Duration) + rs, is, err = getRuntimeAndImageServices(kubeCfg) if err != nil { return nil, err } } - // TODO: Find a better place to start the service. - if err := dockerService.Start(); err != nil { - return nil, err + // Use DockerLegacyService directly to work around unimplemented + // functions in CRI. + // TODO: Remove this hack after CRI is fully implemented. + // TODO: Move the instrumented interface wrapping into kuberuntime. + runtimeService = &struct { + internalApi.RuntimeService + dockershim.DockerLegacyService + }{ + RuntimeService: kuberuntime.NewInstrumentedRuntimeService(rs), + DockerLegacyService: ds, } - - // kubelet defers to the runtime shim to setup networking. Setting - // this to nil will prevent it from trying to invoke the plugin. - // It's easier to always probe and initialize plugins till cri - // becomes the default. - klet.networkPlugin = nil - - runtime, err := kuberuntime.NewKubeGenericRuntimeManager( - kubecontainer.FilterEventRecorder(kubeDeps.Recorder), - klet.livenessManager, - containerRefManager, - machineInfo, - klet.podManager, - kubeDeps.OSInterface, - klet.networkPlugin, - klet, - klet.httpClient, - imageBackOff, - kubeCfg.SerializeImagePulls, - float32(kubeCfg.RegistryPullQPS), - int(kubeCfg.RegistryBurst), - klet.cpuCFSQuota, - // Use DockerLegacyService directly to workaround unimplemented functions. - // We add short hack here to keep other code clean. - // TODO: Remove this hack after CRI is fully designed and implemented. - // TODO: Move the instrumented interface wrapping into kuberuntime. - &struct { - internalApi.RuntimeService - dockershim.DockerLegacyService - }{ - RuntimeService: kuberuntime.NewInstrumentedRuntimeService(runtimeService), - DockerLegacyService: dockerService, - }, - kuberuntime.NewInstrumentedImageManagerService(imageService), - ) + imageService = is + case "remote": + runtimeService, imageService, err = getRuntimeAndImageServices(kubeCfg) if err != nil { return nil, err } - klet.containerRuntime = runtime - klet.runner = runtime default: - // Only supported one for now, continue. + return nil, fmt.Errorf("unsupported CRI runtime: %q", kubeCfg.ContainerRuntime) + } + runtime, err := kuberuntime.NewKubeGenericRuntimeManager( + kubecontainer.FilterEventRecorder(kubeDeps.Recorder), + klet.livenessManager, + containerRefManager, + machineInfo, + klet.podManager, + kubeDeps.OSInterface, + klet.networkPlugin, + klet, + klet.httpClient, + imageBackOff, + kubeCfg.SerializeImagePulls, + float32(kubeCfg.RegistryPullQPS), + int(kubeCfg.RegistryBurst), + klet.cpuCFSQuota, + runtimeService, + kuberuntime.NewInstrumentedImageManagerService(imageService), + ) + if err != nil { + return nil, err + } + klet.containerRuntime = runtime + klet.runner = runtime + } else { + switch kubeCfg.ContainerRuntime { + case "docker": runtime := dockertools.NewDockerManager( kubeDeps.DockerClient, kubecontainer.FilterEventRecorder(kubeDeps.Recorder), @@ -620,72 +643,40 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub ) klet.containerRuntime = runtime klet.runner = kubecontainer.DirectStreamingRunner(runtime) + case "rkt": + // TODO: Include hairpin mode settings in rkt? + conf := &rkt.Config{ + Path: kubeCfg.RktPath, + Stage1Image: kubeCfg.RktStage1Image, + InsecureOptions: "image,ondisk", + } + runtime, err := rkt.New( + kubeCfg.RktAPIEndpoint, + conf, + klet, + kubeDeps.Recorder, + containerRefManager, + klet.podManager, + klet.livenessManager, + klet.httpClient, + klet.networkPlugin, + klet.hairpinMode == componentconfig.HairpinVeth, + utilexec.New(), + kubecontainer.RealOS{}, + imageBackOff, + kubeCfg.SerializeImagePulls, + float32(kubeCfg.RegistryPullQPS), + int(kubeCfg.RegistryBurst), + kubeCfg.RuntimeRequestTimeout.Duration, + ) + if err != nil { + return nil, err + } + klet.containerRuntime = runtime + klet.runner = kubecontainer.DirectStreamingRunner(runtime) + default: + return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime) } - case "rkt": - // TODO: Include hairpin mode settings in rkt? - conf := &rkt.Config{ - Path: kubeCfg.RktPath, - Stage1Image: kubeCfg.RktStage1Image, - InsecureOptions: "image,ondisk", - } - rktRuntime, err := rkt.New( - kubeCfg.RktAPIEndpoint, - conf, - klet, - kubeDeps.Recorder, - containerRefManager, - klet.podManager, - klet.livenessManager, - klet.httpClient, - klet.networkPlugin, - klet.hairpinMode == componentconfig.HairpinVeth, - utilexec.New(), - kubecontainer.RealOS{}, - imageBackOff, - kubeCfg.SerializeImagePulls, - float32(kubeCfg.RegistryPullQPS), - int(kubeCfg.RegistryBurst), - kubeCfg.RuntimeRequestTimeout.Duration, - ) - if err != nil { - return nil, err - } - klet.containerRuntime = rktRuntime - klet.runner = kubecontainer.DirectStreamingRunner(rktRuntime) - case "remote": - remoteRuntimeService, err := remote.NewRemoteRuntimeService(kubeCfg.RemoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration) - if err != nil { - return nil, err - } - remoteImageService, err := remote.NewRemoteImageService(kubeCfg.RemoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration) - if err != nil { - return nil, err - } - runtime, err := kuberuntime.NewKubeGenericRuntimeManager( - kubecontainer.FilterEventRecorder(kubeDeps.Recorder), - klet.livenessManager, - containerRefManager, - machineInfo, - klet.podManager, - kubeDeps.OSInterface, - klet.networkPlugin, - klet, - klet.httpClient, - imageBackOff, - kubeCfg.SerializeImagePulls, - float32(kubeCfg.RegistryPullQPS), - int(kubeCfg.RegistryBurst), - klet.cpuCFSQuota, - remoteRuntimeService, - remoteImageService, - ) - if err != nil { - return nil, err - } - klet.containerRuntime = runtime - klet.runner = runtime - default: - return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime) } // TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency