extract PreInitRuntimeService from NewMainKubelet

This commit is contained in:
louisgong 2019-12-14 13:26:01 +08:00
parent 0dd468039d
commit e8e1cc9ee0
2 changed files with 102 additions and 96 deletions

View File

@ -757,6 +757,17 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
klog.Warning(err) klog.Warning(err)
} }
err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
kubeDeps, &s.ContainerRuntimeOptions,
s.ContainerRuntime,
s.RuntimeCgroups,
s.RemoteRuntimeEndpoint,
s.RemoteImageEndpoint,
s.NonMasqueradeCIDR)
if err != nil {
return err
}
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil { if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err return err
} }
@ -1062,7 +1073,6 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie
kubeDeps, kubeDeps,
&kubeServer.ContainerRuntimeOptions, &kubeServer.ContainerRuntimeOptions,
kubeServer.ContainerRuntime, kubeServer.ContainerRuntime,
kubeServer.RuntimeCgroups,
kubeServer.HostnameOverride, kubeServer.HostnameOverride,
kubeServer.NodeIP, kubeServer.NodeIP,
kubeServer.ProviderID, kubeServer.ProviderID,
@ -1072,8 +1082,6 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie
kubeServer.RegisterNode, kubeServer.RegisterNode,
kubeServer.RegisterWithTaints, kubeServer.RegisterWithTaints,
kubeServer.AllowedUnsafeSysctls, kubeServer.AllowedUnsafeSysctls,
kubeServer.RemoteRuntimeEndpoint,
kubeServer.RemoteImageEndpoint,
kubeServer.ExperimentalMounterPath, kubeServer.ExperimentalMounterPath,
kubeServer.ExperimentalKernelMemcgNotification, kubeServer.ExperimentalKernelMemcgNotification,
kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount, kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
@ -1083,7 +1091,6 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie
kubeServer.MaxContainerCount, kubeServer.MaxContainerCount,
kubeServer.MasterServiceNamespace, kubeServer.MasterServiceNamespace,
kubeServer.RegisterSchedulable, kubeServer.RegisterSchedulable,
kubeServer.NonMasqueradeCIDR,
kubeServer.KeepTerminatedPodVolumes, kubeServer.KeepTerminatedPodVolumes,
kubeServer.NodeLabels, kubeServer.NodeLabels,
kubeServer.SeccompProfileRoot, kubeServer.SeccompProfileRoot,
@ -1138,7 +1145,6 @@ func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *kubelet.Dependencies, kubeDeps *kubelet.Dependencies,
crOptions *config.ContainerRuntimeOptions, crOptions *config.ContainerRuntimeOptions,
containerRuntime string, containerRuntime string,
runtimeCgroups string,
hostnameOverride string, hostnameOverride string,
nodeIP string, nodeIP string,
providerID string, providerID string,
@ -1148,8 +1154,6 @@ func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
registerNode bool, registerNode bool,
registerWithTaints []api.Taint, registerWithTaints []api.Taint,
allowedUnsafeSysctls []string, allowedUnsafeSysctls []string,
remoteRuntimeEndpoint string,
remoteImageEndpoint string,
experimentalMounterPath string, experimentalMounterPath string,
experimentalKernelMemcgNotification bool, experimentalKernelMemcgNotification bool,
experimentalCheckNodeCapabilitiesBeforeMount bool, experimentalCheckNodeCapabilitiesBeforeMount bool,
@ -1159,7 +1163,6 @@ func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
maxContainerCount int32, maxContainerCount int32,
masterServiceNamespace string, masterServiceNamespace string,
registerSchedulable bool, registerSchedulable bool,
nonMasqueradeCIDR string,
keepTerminatedPodVolumes bool, keepTerminatedPodVolumes bool,
nodeLabels map[string]string, nodeLabels map[string]string,
seccompProfileRoot string, seccompProfileRoot string,
@ -1172,7 +1175,6 @@ func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps, kubeDeps,
crOptions, crOptions,
containerRuntime, containerRuntime,
runtimeCgroups,
hostnameOverride, hostnameOverride,
nodeIP, nodeIP,
providerID, providerID,
@ -1182,8 +1184,6 @@ func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
registerNode, registerNode,
registerWithTaints, registerWithTaints,
allowedUnsafeSysctls, allowedUnsafeSysctls,
remoteRuntimeEndpoint,
remoteImageEndpoint,
experimentalMounterPath, experimentalMounterPath,
experimentalKernelMemcgNotification, experimentalKernelMemcgNotification,
experimentalCheckNodeCapabilitiesBeforeMount, experimentalCheckNodeCapabilitiesBeforeMount,
@ -1193,7 +1193,6 @@ func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
maxContainerCount, maxContainerCount,
masterServiceNamespace, masterServiceNamespace,
registerSchedulable, registerSchedulable,
nonMasqueradeCIDR,
keepTerminatedPodVolumes, keepTerminatedPodVolumes,
nodeLabels, nodeLabels,
seccompProfileRoot, seccompProfileRoot,

View File

@ -266,6 +266,10 @@ type Dependencies struct {
KubeletConfigController *kubeletconfig.Controller KubeletConfigController *kubeletconfig.Controller
RemoteRuntimeService internalapi.RuntimeService RemoteRuntimeService internalapi.RuntimeService
RemoteImageService internalapi.ImageManagerService RemoteImageService internalapi.ImageManagerService
criHandler http.Handler
dockerLegacyService dockershim.DockerLegacyService
// remove it after cadvisor.UsingLegacyCadvisorStats dropped.
useLegacyCadvisorStats bool
} }
// makePodSourceConfig creates a config.PodConfig from the given // makePodSourceConfig creates a config.PodConfig from the given
@ -319,13 +323,90 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
return cfg, nil return cfg, nil
} }
// PreInitRuntimeService will init runtime service before RunKubelet.
func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *Dependencies,
crOptions *config.ContainerRuntimeOptions,
containerRuntime string,
runtimeCgroups string,
remoteRuntimeEndpoint string,
remoteImageEndpoint string,
nonMasqueradeCIDR string) error {
if remoteRuntimeEndpoint != "" {
// remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified
if remoteImageEndpoint == "" {
remoteImageEndpoint = remoteRuntimeEndpoint
}
}
switch containerRuntime {
case kubetypes.DockerContainerRuntime:
// TODO: These need to become arguments to a standalone docker shim.
pluginSettings := dockershim.NetworkPluginSettings{
HairpinMode: kubeletconfiginternal.HairpinMode(kubeCfg.HairpinMode),
NonMasqueradeCIDR: nonMasqueradeCIDR,
PluginName: crOptions.NetworkPluginName,
PluginConfDir: crOptions.CNIConfDir,
PluginBinDirString: crOptions.CNIBinDir,
PluginCacheDir: crOptions.CNICacheDir,
MTU: int(crOptions.NetworkPluginMTU),
}
// Create and start the CRI shim running as a grpc server.
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
&pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
if err != nil {
return err
}
if crOptions.RedirectContainerStreaming {
kubeDeps.criHandler = ds
}
// The unix socket for kubelet <-> dockershim communication, dockershim start before runtime service init.
klog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
remoteRuntimeEndpoint,
remoteImageEndpoint)
klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
dockerServer := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
if err := dockerServer.Start(); err != nil {
return err
}
// Create dockerLegacyService when the logging driver is not supported.
supported, err := ds.IsCRISupportedLogDriver()
if err != nil {
return err
}
if !supported {
kubeDeps.dockerLegacyService = ds
}
case kubetypes.RemoteContainerRuntime:
// No-op.
break
default:
return fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
}
var err error
if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil {
return err
}
if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil {
return err
}
kubeDeps.useLegacyCadvisorStats = cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint)
return nil
}
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules. // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here. // No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *Dependencies, kubeDeps *Dependencies,
crOptions *config.ContainerRuntimeOptions, crOptions *config.ContainerRuntimeOptions,
containerRuntime string, containerRuntime string,
runtimeCgroups string,
hostnameOverride string, hostnameOverride string,
nodeIP string, nodeIP string,
providerID string, providerID string,
@ -335,8 +416,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
registerNode bool, registerNode bool,
registerWithTaints []api.Taint, registerWithTaints []api.Taint,
allowedUnsafeSysctls []string, allowedUnsafeSysctls []string,
remoteRuntimeEndpoint string,
remoteImageEndpoint string,
experimentalMounterPath string, experimentalMounterPath string,
experimentalKernelMemcgNotification bool, experimentalKernelMemcgNotification bool,
experimentalCheckNodeCapabilitiesBeforeMount bool, experimentalCheckNodeCapabilitiesBeforeMount bool,
@ -346,7 +425,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
maxContainerCount int32, maxContainerCount int32,
masterServiceNamespace string, masterServiceNamespace string,
registerSchedulable bool, registerSchedulable bool,
nonMasqueradeCIDR string,
keepTerminatedPodVolumes bool, keepTerminatedPodVolumes bool,
nodeLabels map[string]string, nodeLabels map[string]string,
seccompProfileRoot string, seccompProfileRoot string,
@ -589,82 +667,11 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet) klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
if remoteRuntimeEndpoint != "" {
// remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified
if remoteImageEndpoint == "" {
remoteImageEndpoint = remoteRuntimeEndpoint
}
}
// TODO: These need to become arguments to a standalone docker shim.
pluginSettings := dockershim.NetworkPluginSettings{
HairpinMode: kubeletconfiginternal.HairpinMode(kubeCfg.HairpinMode),
NonMasqueradeCIDR: nonMasqueradeCIDR,
PluginName: crOptions.NetworkPluginName,
PluginConfDir: crOptions.CNIConfDir,
PluginBinDirString: crOptions.CNIBinDir,
PluginCacheDir: crOptions.CNICacheDir,
MTU: int(crOptions.NetworkPluginMTU),
}
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration) klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration)
// if left at nil, that means it is unneeded klet.dockerLegacyService = kubeDeps.dockerLegacyService
var legacyLogProvider kuberuntime.LegacyLogProvider klet.criHandler = kubeDeps.criHandler
klet.runtimeService = kubeDeps.RemoteRuntimeService
switch containerRuntime {
case kubetypes.DockerContainerRuntime:
// Create and start the CRI shim running as a grpc server.
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
&pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
if err != nil {
return nil, err
}
if crOptions.RedirectContainerStreaming {
klet.criHandler = ds
}
// The unix socket for kubelet <-> dockershim communication.
klog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
remoteRuntimeEndpoint,
remoteImageEndpoint)
klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
if err := server.Start(); err != nil {
return nil, err
}
// Create dockerLegacyService when the logging driver is not supported.
supported, err := ds.IsCRISupportedLogDriver()
if err != nil {
return nil, err
}
if !supported {
klet.dockerLegacyService = ds
legacyLogProvider = ds
}
case kubetypes.RemoteContainerRuntime:
// No-op.
break
default:
return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
}
runtimeService, imageService := kubeDeps.RemoteRuntimeService, kubeDeps.RemoteImageService
if runtimeService == nil {
runtimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, err
}
}
if imageService == nil {
imageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, err
}
}
klet.runtimeService = runtimeService
if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.KubeClient != nil { if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.KubeClient != nil {
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient) klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
@ -687,10 +694,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
int(kubeCfg.RegistryBurst), int(kubeCfg.RegistryBurst),
kubeCfg.CPUCFSQuota, kubeCfg.CPUCFSQuota,
kubeCfg.CPUCFSQuotaPeriod, kubeCfg.CPUCFSQuotaPeriod,
runtimeService, kubeDeps.RemoteRuntimeService,
imageService, kubeDeps.RemoteImageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(), kubeDeps.ContainerManager.InternalContainerLifecycle(),
legacyLogProvider, kubeDeps.dockerLegacyService,
klet.runtimeClassManager, klet.runtimeClassManager,
) )
if err != nil { if err != nil {
@ -706,7 +713,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
} }
klet.runtimeCache = runtimeCache klet.runtimeCache = runtimeCache
if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) { if kubeDeps.useLegacyCadvisorStats {
klet.StatsProvider = stats.NewCadvisorStatsProvider( klet.StatsProvider = stats.NewCadvisorStatsProvider(
klet.cadvisor, klet.cadvisor,
klet.resourceAnalyzer, klet.resourceAnalyzer,
@ -720,8 +727,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.resourceAnalyzer, klet.resourceAnalyzer,
klet.podManager, klet.podManager,
klet.runtimeCache, klet.runtimeCache,
runtimeService, kubeDeps.RemoteRuntimeService,
imageService, kubeDeps.RemoteImageService,
stats.NewLogMetricsService(), stats.NewLogMetricsService(),
kubecontainer.RealOS{}) kubecontainer.RealOS{})
} }