CRI: rearrange kubelet runtime initialization

Consolidate the code used by docker+cri and remote+cri for consistency, and to
prevent changing one without the other.  Enforce that
`--experimental-runtime-integration-type` has to be set in order for kubelet
use the CRI interface, *even for out-of-process shims`. This simplifies the
temporary `if` logic in kubelet while CRI still co-exists with older logic.
This commit is contained in:
Yu-Ju Hong 2016-11-03 17:06:04 -07:00
parent f787cea237
commit 5a148ee6e9

View File

@ -253,6 +253,18 @@ func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps
return cfg, nil return cfg, nil
} }
func getRuntimeAndImageServices(config *componentconfig.KubeletConfiguration) (internalApi.RuntimeService, internalApi.ImageManagerService, error) {
rs, err := remote.NewRemoteRuntimeService(config.RemoteRuntimeEndpoint, config.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, nil, err
}
is, err := remote.NewRemoteImageService(config.RemoteImageEndpoint, config.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, nil, err
}
return rs, is, err
}
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules. // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here. // No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) { func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) {
@ -502,55 +514,75 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
var nl *noOpLegacyHost var nl *noOpLegacyHost
pluginSettings.LegacyRuntimeHost = nl pluginSettings.LegacyRuntimeHost = nl
// Initialize the runtime. if kubeCfg.ExperimentalRuntimeIntegrationType == "cri" {
switch kubeCfg.ContainerRuntime {
case "docker":
switch kubeCfg.ExperimentalRuntimeIntegrationType {
case "cri":
// Use the new CRI shim for docker. This is needed for testing the
// docker integration through CRI, and may be removed in the future.
dockerService, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil, &pluginSettings, kubeCfg.RuntimeCgroups)
if err != nil {
return nil, err
}
runtimeService := dockerService.(internalApi.RuntimeService)
imageService := dockerService.(internalApi.ImageManagerService)
// This is a temporary knob to easily switch between grpc and non-grpc integration. grpc
// will be enabled if this is not empty.
// TODO(random-liu): Remove the temporary knob after grpc integration is stabilized and
// pass the runtime endpoint through kubelet flags.
remoteEndpoint := "/var/run/dockershim.sock"
// If the remote runtime endpoint is set, use the grpc integration.
if remoteEndpoint != "" {
// Start the in process dockershim grpc server.
server := dockerremote.NewDockerServer(remoteEndpoint, dockerService)
err := server.Start()
if err != nil {
return nil, err
}
// Start the remote kuberuntime manager.
runtimeService, err = remote.NewRemoteRuntimeService(remoteEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, err
}
imageService, err = remote.NewRemoteImageService(remoteEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, err
}
}
// TODO: Find a better place to start the service.
if err := dockerService.Start(); err != nil {
return nil, err
}
// kubelet defers to the runtime shim to setup networking. Setting // kubelet defers to the runtime shim to setup networking. Setting
// this to nil will prevent it from trying to invoke the plugin. // this to nil will prevent it from trying to invoke the plugin.
// It's easier to always probe and initialize plugins till cri // It's easier to always probe and initialize plugins till cri
// becomes the default. // becomes the default.
klet.networkPlugin = nil klet.networkPlugin = nil
var runtimeService internalApi.RuntimeService
var imageService internalApi.ImageManagerService
var err error
switch kubeCfg.ContainerRuntime {
case "docker":
// Use the new CRI shim for docker.
ds, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil, &pluginSettings, kubeCfg.RuntimeCgroups)
if err != nil {
return nil, err
}
// TODO: Once we switch to grpc completely, we should move this
// call to the grpc server start.
if err := ds.Start(); err != nil {
return nil, err
}
rs := ds.(internalApi.RuntimeService)
is := ds.(internalApi.ImageManagerService)
// This is an internal knob to switch between grpc and non-grpc
// integration.
// TODO: Remove this knob once we switch to using GRPC completely.
overGRPC := true
if overGRPC {
const (
// The unix socket for kubelet <-> dockershim communication.
ep = "/var/run/dockershim.sock"
)
kubeCfg.RemoteRuntimeEndpoint = ep
kubeCfg.RemoteImageEndpoint = ep
server := dockerremote.NewDockerServer(ep, ds)
glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
err := server.Start()
if err != nil {
return nil, err
}
rs, is, err = getRuntimeAndImageServices(kubeCfg)
if err != nil {
return nil, err
}
}
// Use DockerLegacyService directly to work around unimplemented
// functions in CRI.
// TODO: Remove this hack after CRI is fully implemented.
// TODO: Move the instrumented interface wrapping into kuberuntime.
runtimeService = &struct {
internalApi.RuntimeService
dockershim.DockerLegacyService
}{
RuntimeService: kuberuntime.NewInstrumentedRuntimeService(rs),
DockerLegacyService: ds,
}
imageService = is
case "remote":
runtimeService, imageService, err = getRuntimeAndImageServices(kubeCfg)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported CRI runtime: %q", kubeCfg.ContainerRuntime)
}
runtime, err := kuberuntime.NewKubeGenericRuntimeManager( runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder), kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager, klet.livenessManager,
@ -566,17 +598,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
float32(kubeCfg.RegistryPullQPS), float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst), int(kubeCfg.RegistryBurst),
klet.cpuCFSQuota, klet.cpuCFSQuota,
// Use DockerLegacyService directly to workaround unimplemented functions. runtimeService,
// We add short hack here to keep other code clean.
// TODO: Remove this hack after CRI is fully designed and implemented.
// TODO: Move the instrumented interface wrapping into kuberuntime.
&struct {
internalApi.RuntimeService
dockershim.DockerLegacyService
}{
RuntimeService: kuberuntime.NewInstrumentedRuntimeService(runtimeService),
DockerLegacyService: dockerService,
},
kuberuntime.NewInstrumentedImageManagerService(imageService), kuberuntime.NewInstrumentedImageManagerService(imageService),
) )
if err != nil { if err != nil {
@ -584,8 +606,9 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
} }
klet.containerRuntime = runtime klet.containerRuntime = runtime
klet.runner = runtime klet.runner = runtime
default: } else {
// Only supported one for now, continue. switch kubeCfg.ContainerRuntime {
case "docker":
runtime := dockertools.NewDockerManager( runtime := dockertools.NewDockerManager(
kubeDeps.DockerClient, kubeDeps.DockerClient,
kubecontainer.FilterEventRecorder(kubeDeps.Recorder), kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
@ -620,7 +643,6 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
) )
klet.containerRuntime = runtime klet.containerRuntime = runtime
klet.runner = kubecontainer.DirectStreamingRunner(runtime) klet.runner = kubecontainer.DirectStreamingRunner(runtime)
}
case "rkt": case "rkt":
// TODO: Include hairpin mode settings in rkt? // TODO: Include hairpin mode settings in rkt?
conf := &rkt.Config{ conf := &rkt.Config{
@ -628,7 +650,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
Stage1Image: kubeCfg.RktStage1Image, Stage1Image: kubeCfg.RktStage1Image,
InsecureOptions: "image,ondisk", InsecureOptions: "image,ondisk",
} }
rktRuntime, err := rkt.New( runtime, err := rkt.New(
kubeCfg.RktAPIEndpoint, kubeCfg.RktAPIEndpoint,
conf, conf,
klet, klet,
@ -650,43 +672,12 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
if err != nil { if err != nil {
return nil, err return nil, err
} }
klet.containerRuntime = rktRuntime
klet.runner = kubecontainer.DirectStreamingRunner(rktRuntime)
case "remote":
remoteRuntimeService, err := remote.NewRemoteRuntimeService(kubeCfg.RemoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, err
}
remoteImageService, err := remote.NewRemoteImageService(kubeCfg.RemoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, err
}
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
containerRefManager,
machineInfo,
klet.podManager,
kubeDeps.OSInterface,
klet.networkPlugin,
klet,
klet.httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
klet.cpuCFSQuota,
remoteRuntimeService,
remoteImageService,
)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime klet.containerRuntime = runtime
klet.runner = runtime klet.runner = kubecontainer.DirectStreamingRunner(runtime)
default: default:
return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime) return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime)
} }
}
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency // TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, klet.containerRuntime) klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, klet.containerRuntime)