mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-03 23:40:03 +00:00 
			
		
		
		
	Alpha implementation of the Dynamic Kubelet Configuration feature. See the proposal doc in #29459.
		
			
				
	
	
		
			842 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			842 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2015 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
// Package app makes it easy to create a kubelet server for various contexts.
 | 
						|
package app
 | 
						|
 | 
						|
import (
 | 
						|
	"crypto/tls"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"math/rand"
 | 
						|
	"net"
 | 
						|
	"net/http"
 | 
						|
	_ "net/http/pprof"
 | 
						|
	"net/url"
 | 
						|
	"os"
 | 
						|
	"path"
 | 
						|
	"path/filepath"
 | 
						|
	"strconv"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/golang/glog"
 | 
						|
	"github.com/spf13/cobra"
 | 
						|
	"github.com/spf13/pflag"
 | 
						|
 | 
						|
	"k8s.io/api/core/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/api/resource"
 | 
						|
	"k8s.io/apimachinery/pkg/types"
 | 
						|
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
						|
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	"k8s.io/apiserver/pkg/server/healthz"
 | 
						|
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
						|
	clientgoclientset "k8s.io/client-go/kubernetes"
 | 
						|
	clientset "k8s.io/client-go/kubernetes"
 | 
						|
	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
 | 
						|
	restclient "k8s.io/client-go/rest"
 | 
						|
	"k8s.io/client-go/tools/clientcmd"
 | 
						|
	"k8s.io/client-go/tools/record"
 | 
						|
	certutil "k8s.io/client-go/util/cert"
 | 
						|
	"k8s.io/kubernetes/cmd/kubelet/app/options"
 | 
						|
	"k8s.io/kubernetes/pkg/api"
 | 
						|
	"k8s.io/kubernetes/pkg/apis/componentconfig"
 | 
						|
	componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
 | 
						|
	"k8s.io/kubernetes/pkg/capabilities"
 | 
						|
	"k8s.io/kubernetes/pkg/client/chaosclient"
 | 
						|
	"k8s.io/kubernetes/pkg/cloudprovider"
 | 
						|
	"k8s.io/kubernetes/pkg/credentialprovider"
 | 
						|
	"k8s.io/kubernetes/pkg/features"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/cadvisor"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/certificate"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/cm"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/config"
 | 
						|
	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/dockershim"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
 | 
						|
	dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/eviction"
 | 
						|
	evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/kubeletconfig"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/server"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/server/streaming"
 | 
						|
	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
 | 
						|
	"k8s.io/kubernetes/pkg/util/configz"
 | 
						|
	"k8s.io/kubernetes/pkg/util/flock"
 | 
						|
	kubeio "k8s.io/kubernetes/pkg/util/io"
 | 
						|
	"k8s.io/kubernetes/pkg/util/mount"
 | 
						|
	nodeutil "k8s.io/kubernetes/pkg/util/node"
 | 
						|
	"k8s.io/kubernetes/pkg/util/oom"
 | 
						|
	"k8s.io/kubernetes/pkg/util/rlimit"
 | 
						|
	"k8s.io/kubernetes/pkg/version"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	// Kubelet component name
 | 
						|
	componentKubelet = "kubelet"
 | 
						|
)
 | 
						|
 | 
						|
// NewKubeletCommand creates a *cobra.Command object with default parameters
 | 
						|
func NewKubeletCommand() *cobra.Command {
 | 
						|
	// ignore the error, as this is just for generating docs and the like
 | 
						|
	s, _ := options.NewKubeletServer()
 | 
						|
	s.AddFlags(pflag.CommandLine)
 | 
						|
	cmd := &cobra.Command{
 | 
						|
		Use: componentKubelet,
 | 
						|
		Long: `The kubelet is the primary "node agent" that runs on each
 | 
						|
node. The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object
 | 
						|
that describes a pod. The kubelet takes a set of PodSpecs that are provided through
 | 
						|
various mechanisms (primarily through the apiserver) and ensures that the containers
 | 
						|
described in those PodSpecs are running and healthy. The kubelet doesn't manage
 | 
						|
containers which were not created by Kubernetes.
 | 
						|
 | 
						|
Other than from an PodSpec from the apiserver, there are three ways that a container
 | 
						|
manifest can be provided to the Kubelet.
 | 
						|
 | 
						|
File: Path passed as a flag on the command line. Files under this path will be monitored
 | 
						|
periodically for updates. The monitoring period is 20s by default and is configurable
 | 
						|
via a flag.
 | 
						|
 | 
						|
HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint
 | 
						|
is checked every 20 seconds (also configurable with a flag).
 | 
						|
 | 
						|
HTTP server: The kubelet can also listen for HTTP and respond to a simple API
 | 
						|
(underspec'd currently) to submit a new manifest.`,
 | 
						|
		Run: func(cmd *cobra.Command, args []string) {
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	return cmd
 | 
						|
}
 | 
						|
 | 
						|
// UnsecuredDependencies returns a Dependencies suitable for being run, or an error if the server setup
 | 
						|
// is not valid.  It will not start any background processes, and does not include authentication/authorization
 | 
						|
func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, error) {
 | 
						|
	// Initialize the TLS Options
 | 
						|
	tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	mounter := mount.New(s.ExperimentalMounterPath)
 | 
						|
	var writer kubeio.Writer = &kubeio.StdWriter{}
 | 
						|
	if s.Containerized {
 | 
						|
		glog.V(2).Info("Running kubelet in containerized mode (experimental)")
 | 
						|
		mounter = mount.NewNsenterMounter()
 | 
						|
		writer = &kubeio.NsenterWriter{}
 | 
						|
	}
 | 
						|
 | 
						|
	var dockerClient libdocker.Interface
 | 
						|
	if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
 | 
						|
		dockerClient = libdocker.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration,
 | 
						|
			s.ImagePullProgressDeadline.Duration)
 | 
						|
	} else {
 | 
						|
		dockerClient = nil
 | 
						|
	}
 | 
						|
 | 
						|
	return &kubelet.Dependencies{
 | 
						|
		Auth:               nil, // default does not enforce auth[nz]
 | 
						|
		CAdvisorInterface:  nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
 | 
						|
		Cloud:              nil, // cloud provider might start background processes
 | 
						|
		ContainerManager:   nil,
 | 
						|
		DockerClient:       dockerClient,
 | 
						|
		KubeClient:         nil,
 | 
						|
		ExternalKubeClient: nil,
 | 
						|
		EventClient:        nil,
 | 
						|
		Mounter:            mounter,
 | 
						|
		NetworkPlugins:     ProbeNetworkPlugins(s.NetworkPluginDir, s.CNIConfDir, s.CNIBinDir),
 | 
						|
		OOMAdjuster:        oom.NewOOMAdjuster(),
 | 
						|
		OSInterface:        kubecontainer.RealOS{},
 | 
						|
		Writer:             writer,
 | 
						|
		VolumePlugins:      ProbeVolumePlugins(s.VolumePluginDir),
 | 
						|
		TLSOptions:         tlsOptions,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
 | 
						|
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
 | 
						|
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
 | 
						|
// not be generated.
 | 
						|
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) error {
 | 
						|
	if err := run(s, kubeDeps); err != nil {
 | 
						|
		return fmt.Errorf("failed to run Kubelet: %v", err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func checkPermissions() error {
 | 
						|
	if uid := os.Getuid(); uid != 0 {
 | 
						|
		return fmt.Errorf("Kubelet needs to run as uid `0`. It is being run as %d", uid)
 | 
						|
	}
 | 
						|
	// TODO: Check if kubelet is running in the `initial` user namespace.
 | 
						|
	// http://man7.org/linux/man-pages/man7/user_namespaces.7.html
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func setConfigz(cz *configz.Config, kc *componentconfig.KubeletConfiguration) {
 | 
						|
	tmp := componentconfigv1alpha1.KubeletConfiguration{}
 | 
						|
	api.Scheme.Convert(kc, &tmp, nil)
 | 
						|
	cz.Set(tmp)
 | 
						|
}
 | 
						|
 | 
						|
func initConfigz(kc *componentconfig.KubeletConfiguration) (*configz.Config, error) {
 | 
						|
	cz, err := configz.New("componentconfig")
 | 
						|
	if err == nil {
 | 
						|
		setConfigz(cz, kc)
 | 
						|
	} else {
 | 
						|
		glog.Errorf("unable to register configz: %s", err)
 | 
						|
	}
 | 
						|
	return cz, err
 | 
						|
}
 | 
						|
 | 
						|
// makeEventRecorder sets up kubeDeps.Recorder if its nil. Its a no-op otherwise.
 | 
						|
func makeEventRecorder(s *componentconfig.KubeletConfiguration, kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
 | 
						|
	if kubeDeps.Recorder != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	eventBroadcaster := record.NewBroadcaster()
 | 
						|
	kubeDeps.Recorder = eventBroadcaster.NewRecorder(api.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
 | 
						|
	eventBroadcaster.StartLogging(glog.V(3).Infof)
 | 
						|
	if kubeDeps.EventClient != nil {
 | 
						|
		glog.V(4).Infof("Sending events to api server.")
 | 
						|
		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
 | 
						|
	} else {
 | 
						|
		glog.Warning("No api server defined - no events will be sent to API server.")
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
 | 
						|
	// Set global feature gates based on the value on the initial KubeletServer
 | 
						|
	err = utilfeature.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
 | 
						|
	if err := options.ValidateKubeletServer(s); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	// Obtain Kubelet Lock File
 | 
						|
	if s.ExitOnLockContention && s.LockFilePath == "" {
 | 
						|
		return errors.New("cannot exit on lock file contention: no lock file specified")
 | 
						|
	}
 | 
						|
	done := make(chan struct{})
 | 
						|
	if s.LockFilePath != "" {
 | 
						|
		glog.Infof("acquiring file lock on %q", s.LockFilePath)
 | 
						|
		if err := flock.Acquire(s.LockFilePath); err != nil {
 | 
						|
			return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
 | 
						|
		}
 | 
						|
		if s.ExitOnLockContention {
 | 
						|
			glog.Infof("watching for inotify events for: %v", s.LockFilePath)
 | 
						|
			if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Register current configuration with /configz endpoint
 | 
						|
	_, err = initConfigz(&s.KubeletConfiguration)
 | 
						|
	if err != nil {
 | 
						|
		glog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// About to get clients and such, detect standaloneMode
 | 
						|
	standaloneMode := true
 | 
						|
	switch {
 | 
						|
	case s.RequireKubeConfig == true:
 | 
						|
		standaloneMode = false
 | 
						|
		glog.Warningf("--require-kubeconfig is deprecated. Set --kubeconfig without using --require-kubeconfig.")
 | 
						|
	case s.KubeConfig.Provided():
 | 
						|
		standaloneMode = false
 | 
						|
	}
 | 
						|
 | 
						|
	if kubeDeps == nil {
 | 
						|
		kubeDeps, err = UnsecuredDependencies(s)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if kubeDeps.Cloud == nil {
 | 
						|
		if !cloudprovider.IsExternal(s.CloudProvider) && s.CloudProvider != componentconfigv1alpha1.AutoDetectCloudProvider {
 | 
						|
			cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			if cloud == nil {
 | 
						|
				glog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
 | 
						|
			} else {
 | 
						|
				glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
 | 
						|
			}
 | 
						|
			kubeDeps.Cloud = cloud
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	nodeName, err := getNodeName(kubeDeps.Cloud, nodeutil.GetHostname(s.HostnameOverride))
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if s.BootstrapKubeconfig != "" {
 | 
						|
		if err := bootstrap.LoadClientCert(s.KubeConfig.Value(), s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// if in standalone mode, indicate as much by setting all clients to nil
 | 
						|
	if standaloneMode {
 | 
						|
		kubeDeps.KubeClient = nil
 | 
						|
		kubeDeps.ExternalKubeClient = nil
 | 
						|
		kubeDeps.EventClient = nil
 | 
						|
		glog.Warningf("standalone mode, no API client")
 | 
						|
	} else if kubeDeps.KubeClient == nil || kubeDeps.ExternalKubeClient == nil || kubeDeps.EventClient == nil {
 | 
						|
		// initialize clients if not standalone mode and any of the clients are not provided
 | 
						|
		var kubeClient clientset.Interface
 | 
						|
		var eventClient v1core.EventsGetter
 | 
						|
		var externalKubeClient clientgoclientset.Interface
 | 
						|
 | 
						|
		clientConfig, err := CreateAPIServerClientConfig(s)
 | 
						|
 | 
						|
		var clientCertificateManager certificate.Manager
 | 
						|
		if err == nil {
 | 
						|
			if utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletClientCertificate) {
 | 
						|
				clientCertificateManager, err = certificate.NewKubeletClientCertificateManager(s.CertDirectory, nodeName, clientConfig.CertData, clientConfig.KeyData, clientConfig.CertFile, clientConfig.KeyFile)
 | 
						|
				if err != nil {
 | 
						|
					return err
 | 
						|
				}
 | 
						|
				if err := certificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager); err != nil {
 | 
						|
					return err
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			kubeClient, err = clientset.NewForConfig(clientConfig)
 | 
						|
			if err != nil {
 | 
						|
				glog.Warningf("New kubeClient from clientConfig error: %v", err)
 | 
						|
			} else if kubeClient.Certificates() != nil && clientCertificateManager != nil {
 | 
						|
				glog.V(2).Info("Starting client certificate rotation.")
 | 
						|
				clientCertificateManager.SetCertificateSigningRequestClient(kubeClient.Certificates().CertificateSigningRequests())
 | 
						|
				clientCertificateManager.Start()
 | 
						|
			}
 | 
						|
			externalKubeClient, err = clientgoclientset.NewForConfig(clientConfig)
 | 
						|
			if err != nil {
 | 
						|
				glog.Warningf("New kubeClient from clientConfig error: %v", err)
 | 
						|
			}
 | 
						|
			// make a separate client for events
 | 
						|
			eventClientConfig := *clientConfig
 | 
						|
			eventClientConfig.QPS = float32(s.EventRecordQPS)
 | 
						|
			eventClientConfig.Burst = int(s.EventBurst)
 | 
						|
			tmpClient, err := clientgoclientset.NewForConfig(&eventClientConfig)
 | 
						|
			if err != nil {
 | 
						|
				glog.Warningf("Failed to create API Server client for Events: %v", err)
 | 
						|
			}
 | 
						|
			eventClient = tmpClient.CoreV1()
 | 
						|
 | 
						|
		} else {
 | 
						|
			switch {
 | 
						|
			case s.RequireKubeConfig:
 | 
						|
				return fmt.Errorf("invalid kubeconfig: %v", err)
 | 
						|
			case s.KubeConfig.Provided():
 | 
						|
				glog.Warningf("invalid kubeconfig: %v", err)
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		kubeDeps.KubeClient = kubeClient
 | 
						|
		kubeDeps.ExternalKubeClient = externalKubeClient
 | 
						|
		if eventClient != nil {
 | 
						|
			kubeDeps.EventClient = eventClient
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Alpha Dynamic Configuration Implementation;
 | 
						|
	// if the kubelet config controller is available, inject the latest to start the config and status sync loops
 | 
						|
	if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
 | 
						|
		kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, string(nodeName))
 | 
						|
	}
 | 
						|
 | 
						|
	if kubeDeps.Auth == nil {
 | 
						|
		auth, err := BuildAuth(nodeName, kubeDeps.ExternalKubeClient, s.KubeletConfiguration)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		kubeDeps.Auth = auth
 | 
						|
	}
 | 
						|
 | 
						|
	if kubeDeps.CAdvisorInterface == nil {
 | 
						|
		kubeDeps.CAdvisorInterface, err = cadvisor.New(s.Address, uint(s.CAdvisorPort), s.ContainerRuntime, s.RootDirectory)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Setup event recorder if required.
 | 
						|
	makeEventRecorder(&s.KubeletConfiguration, kubeDeps, nodeName)
 | 
						|
 | 
						|
	if kubeDeps.ContainerManager == nil {
 | 
						|
		if s.CgroupsPerQOS && s.CgroupRoot == "" {
 | 
						|
			glog.Infof("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
 | 
						|
			s.CgroupRoot = "/"
 | 
						|
		}
 | 
						|
		kubeReserved, err := parseResourceList(s.KubeReserved)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		systemReserved, err := parseResourceList(s.SystemReserved)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		var hardEvictionThresholds []evictionapi.Threshold
 | 
						|
		// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
 | 
						|
		if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
 | 
						|
			hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, "", "", "")
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
		experimentalQOSReserved, err := cm.ParseQOSReserved(s.ExperimentalQOSReserved)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		kubeDeps.ContainerManager, err = cm.NewContainerManager(
 | 
						|
			kubeDeps.Mounter,
 | 
						|
			kubeDeps.CAdvisorInterface,
 | 
						|
			cm.NodeConfig{
 | 
						|
				RuntimeCgroupsName:    s.RuntimeCgroups,
 | 
						|
				SystemCgroupsName:     s.SystemCgroups,
 | 
						|
				KubeletCgroupsName:    s.KubeletCgroups,
 | 
						|
				ContainerRuntime:      s.ContainerRuntime,
 | 
						|
				CgroupsPerQOS:         s.CgroupsPerQOS,
 | 
						|
				CgroupRoot:            s.CgroupRoot,
 | 
						|
				CgroupDriver:          s.CgroupDriver,
 | 
						|
				ProtectKernelDefaults: s.ProtectKernelDefaults,
 | 
						|
				NodeAllocatableConfig: cm.NodeAllocatableConfig{
 | 
						|
					KubeReservedCgroupName:   s.KubeReservedCgroup,
 | 
						|
					SystemReservedCgroupName: s.SystemReservedCgroup,
 | 
						|
					EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
 | 
						|
					KubeReserved:             kubeReserved,
 | 
						|
					SystemReserved:           systemReserved,
 | 
						|
					HardEvictionThresholds:   hardEvictionThresholds,
 | 
						|
				},
 | 
						|
				ExperimentalQOSReserved: *experimentalQOSReserved,
 | 
						|
			},
 | 
						|
			s.FailSwapOn,
 | 
						|
			kubeDeps.Recorder)
 | 
						|
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if err := checkPermissions(); err != nil {
 | 
						|
		glog.Error(err)
 | 
						|
	}
 | 
						|
 | 
						|
	utilruntime.ReallyCrash = s.ReallyCrashForTesting
 | 
						|
 | 
						|
	rand.Seed(time.Now().UTC().UnixNano())
 | 
						|
 | 
						|
	// TODO(vmarmol): Do this through container config.
 | 
						|
	oomAdjuster := kubeDeps.OOMAdjuster
 | 
						|
	if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
 | 
						|
		glog.Warning(err)
 | 
						|
	}
 | 
						|
 | 
						|
	if err := RunKubelet(&s.KubeletFlags, &s.KubeletConfiguration, kubeDeps, s.RunOnce); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if s.HealthzPort > 0 {
 | 
						|
		healthz.DefaultHealthz()
 | 
						|
		go wait.Until(func() {
 | 
						|
			err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
 | 
						|
			if err != nil {
 | 
						|
				glog.Errorf("Starting health server failed: %v", err)
 | 
						|
			}
 | 
						|
		}, 5*time.Second, wait.NeverStop)
 | 
						|
	}
 | 
						|
 | 
						|
	if s.RunOnce {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	<-done
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// getNodeName returns the node name according to the cloud provider
 | 
						|
// if cloud provider is specified. Otherwise, returns the hostname of the node.
 | 
						|
func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName, error) {
 | 
						|
	if cloud == nil {
 | 
						|
		return types.NodeName(hostname), nil
 | 
						|
	}
 | 
						|
 | 
						|
	instances, ok := cloud.Instances()
 | 
						|
	if !ok {
 | 
						|
		return "", fmt.Errorf("failed to get instances from cloud provider")
 | 
						|
	}
 | 
						|
 | 
						|
	nodeName, err := instances.CurrentNodeName(hostname)
 | 
						|
	if err != nil {
 | 
						|
		return "", fmt.Errorf("error fetching current node name from cloud provider: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
 | 
						|
 | 
						|
	return nodeName, nil
 | 
						|
}
 | 
						|
 | 
						|
// InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
 | 
						|
// certificate and key file are generated. Returns a configured server.TLSOptions object.
 | 
						|
func InitializeTLS(kf *options.KubeletFlags, kc *componentconfig.KubeletConfiguration) (*server.TLSOptions, error) {
 | 
						|
	if !utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
 | 
						|
		kc.TLSCertFile = path.Join(kf.CertDirectory, "kubelet.crt")
 | 
						|
		kc.TLSPrivateKeyFile = path.Join(kf.CertDirectory, "kubelet.key")
 | 
						|
 | 
						|
		canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		if !canReadCertAndKey {
 | 
						|
			cert, key, err := certutil.GenerateSelfSignedCertKey(nodeutil.GetHostname(kf.HostnameOverride), nil, nil)
 | 
						|
			if err != nil {
 | 
						|
				return nil, fmt.Errorf("unable to generate self signed cert: %v", err)
 | 
						|
			}
 | 
						|
 | 
						|
			if err := certutil.WriteCert(kc.TLSCertFile, cert); err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
 | 
						|
			if err := certutil.WriteKey(kc.TLSPrivateKeyFile, key); err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
 | 
						|
			glog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	tlsOptions := &server.TLSOptions{
 | 
						|
		Config: &tls.Config{
 | 
						|
			// Can't use SSLv3 because of POODLE and BEAST
 | 
						|
			// Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher
 | 
						|
			// Can't use TLSv1.1 because of RC4 cipher usage
 | 
						|
			MinVersion: tls.VersionTLS12,
 | 
						|
		},
 | 
						|
		CertFile: kc.TLSCertFile,
 | 
						|
		KeyFile:  kc.TLSPrivateKeyFile,
 | 
						|
	}
 | 
						|
 | 
						|
	if len(kc.Authentication.X509.ClientCAFile) > 0 {
 | 
						|
		clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile)
 | 
						|
		if err != nil {
 | 
						|
			return nil, fmt.Errorf("unable to load client CA file %s: %v", kc.Authentication.X509.ClientCAFile, err)
 | 
						|
		}
 | 
						|
		// Specify allowed CAs for client certificates
 | 
						|
		tlsOptions.Config.ClientCAs = clientCAs
 | 
						|
		// Populate PeerCertificates in requests, but don't reject connections without verified certificates
 | 
						|
		tlsOptions.Config.ClientAuth = tls.RequestClientCert
 | 
						|
	}
 | 
						|
 | 
						|
	return tlsOptions, nil
 | 
						|
}
 | 
						|
 | 
						|
func kubeconfigClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
 | 
						|
	return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
 | 
						|
		&clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig.Value()},
 | 
						|
		&clientcmd.ConfigOverrides{},
 | 
						|
	).ClientConfig()
 | 
						|
}
 | 
						|
 | 
						|
// createClientConfig creates a client configuration from the command line arguments.
 | 
						|
// If --kubeconfig is explicitly set, it will be used. If it is not set but
 | 
						|
// --require-kubeconfig=true, we attempt to load the default kubeconfig file.
 | 
						|
func createClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
 | 
						|
	// If --kubeconfig was not provided, it will have a default path set in cmd/kubelet/app/options/options.go.
 | 
						|
	// We only use that default path when --require-kubeconfig=true. The default path is temporary until --require-kubeconfig is removed.
 | 
						|
	// TODO(#41161:v1.10.0): Remove the default kubeconfig path and --require-kubeconfig.
 | 
						|
	if s.BootstrapKubeconfig != "" || s.KubeConfig.Provided() || s.RequireKubeConfig == true {
 | 
						|
		return kubeconfigClientConfig(s)
 | 
						|
	} else {
 | 
						|
		return nil, fmt.Errorf("createClientConfig called in standalone mode")
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// CreateAPIServerClientConfig generates a client.Config from command line flags
 | 
						|
// via createClientConfig and then injects chaos into the configuration via addChaosToClientConfig.
 | 
						|
// This func is exported to support integration with third party kubelet extensions (e.g. kubernetes-mesos).
 | 
						|
func CreateAPIServerClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
 | 
						|
	clientConfig, err := createClientConfig(s)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	clientConfig.ContentType = s.ContentType
 | 
						|
	// Override kubeconfig qps/burst settings from flags
 | 
						|
	clientConfig.QPS = float32(s.KubeAPIQPS)
 | 
						|
	clientConfig.Burst = int(s.KubeAPIBurst)
 | 
						|
 | 
						|
	addChaosToClientConfig(s, clientConfig)
 | 
						|
	return clientConfig, nil
 | 
						|
}
 | 
						|
 | 
						|
// addChaosToClientConfig injects random errors into client connections if configured.
 | 
						|
func addChaosToClientConfig(s *options.KubeletServer, config *restclient.Config) {
 | 
						|
	if s.ChaosChance != 0.0 {
 | 
						|
		config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
 | 
						|
			seed := chaosclient.NewSeed(1)
 | 
						|
			// TODO: introduce a standard chaos package with more tunables - this is just a proof of concept
 | 
						|
			// TODO: introduce random latency and stalls
 | 
						|
			return chaosclient.NewChaosRoundTripper(rt, chaosclient.LogChaos, seed.P(s.ChaosChance, chaosclient.ErrSimulatedConnectionResetByPeer))
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
 | 
						|
//   1 Integration tests
 | 
						|
//   2 Kubelet binary
 | 
						|
//   3 Standalone 'kubernetes' binary
 | 
						|
// Eventually, #2 will be replaced with instances of #3
 | 
						|
func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.Dependencies, runOnce bool) error {
 | 
						|
	hostname := nodeutil.GetHostname(kubeFlags.HostnameOverride)
 | 
						|
	// Query the cloud provider for our node name, default to hostname if kcfg.Cloud == nil
 | 
						|
	nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	// Setup event recorder if required.
 | 
						|
	makeEventRecorder(kubeCfg, kubeDeps, nodeName)
 | 
						|
 | 
						|
	// TODO(mtaufen): I moved the validation of these fields here, from UnsecuredKubeletConfig,
 | 
						|
	//                so that I could remove the associated fields from KubeletConfig. I would
 | 
						|
	//                prefer this to be done as part of an independent validation step on the
 | 
						|
	//                KubeletConfiguration. But as far as I can tell, we don't have an explicit
 | 
						|
	//                place for validation of the KubeletConfiguration yet.
 | 
						|
	hostNetworkSources, err := kubetypes.GetValidatedSources(kubeCfg.HostNetworkSources)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	hostPIDSources, err := kubetypes.GetValidatedSources(kubeCfg.HostPIDSources)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	hostIPCSources, err := kubetypes.GetValidatedSources(kubeCfg.HostIPCSources)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	privilegedSources := capabilities.PrivilegedSources{
 | 
						|
		HostNetworkSources: hostNetworkSources,
 | 
						|
		HostPIDSources:     hostPIDSources,
 | 
						|
		HostIPCSources:     hostIPCSources,
 | 
						|
	}
 | 
						|
	capabilities.Setup(kubeCfg.AllowPrivileged, privilegedSources, 0)
 | 
						|
 | 
						|
	credentialprovider.SetPreferredDockercfgPath(kubeFlags.RootDirectory)
 | 
						|
	glog.V(2).Infof("Using root directory: %v", kubeFlags.RootDirectory)
 | 
						|
 | 
						|
	builder := kubeDeps.Builder
 | 
						|
	if builder == nil {
 | 
						|
		builder = CreateAndInitKubelet
 | 
						|
	}
 | 
						|
	if kubeDeps.OSInterface == nil {
 | 
						|
		kubeDeps.OSInterface = kubecontainer.RealOS{}
 | 
						|
	}
 | 
						|
 | 
						|
	k, err := builder(kubeCfg, kubeDeps, &kubeFlags.ContainerRuntimeOptions, kubeFlags.HostnameOverride, kubeFlags.NodeIP, kubeFlags.ProviderID, kubeFlags.CloudProvider, kubeFlags.CertDirectory, kubeFlags.RootDirectory)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("failed to create kubelet: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// NewMainKubelet should have set up a pod source config if one didn't exist
 | 
						|
	// when the builder was run. This is just a precaution.
 | 
						|
	if kubeDeps.PodConfig == nil {
 | 
						|
		return fmt.Errorf("failed to create kubelet, pod source config was nil")
 | 
						|
	}
 | 
						|
	podCfg := kubeDeps.PodConfig
 | 
						|
 | 
						|
	rlimit.RlimitNumFiles(uint64(kubeCfg.MaxOpenFiles))
 | 
						|
 | 
						|
	// process pods and exit.
 | 
						|
	if runOnce {
 | 
						|
		if _, err := k.RunOnce(podCfg.Updates()); err != nil {
 | 
						|
			return fmt.Errorf("runonce failed: %v", err)
 | 
						|
		}
 | 
						|
		glog.Infof("Started kubelet %s as runonce", version.Get().String())
 | 
						|
	} else {
 | 
						|
		startKubelet(k, podCfg, kubeCfg, kubeDeps)
 | 
						|
		glog.Infof("Started kubelet %s", version.Get().String())
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.Dependencies) {
 | 
						|
	// start the kubelet
 | 
						|
	go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)
 | 
						|
 | 
						|
	// start the kubelet server
 | 
						|
	if kubeCfg.EnableServer {
 | 
						|
		go wait.Until(func() {
 | 
						|
			k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
 | 
						|
		}, 0, wait.NeverStop)
 | 
						|
	}
 | 
						|
	if kubeCfg.ReadOnlyPort > 0 {
 | 
						|
		go wait.Until(func() {
 | 
						|
			k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
 | 
						|
		}, 0, wait.NeverStop)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration,
 | 
						|
	kubeDeps *kubelet.Dependencies,
 | 
						|
	crOptions *options.ContainerRuntimeOptions,
 | 
						|
	hostnameOverride,
 | 
						|
	nodeIP,
 | 
						|
	providerID,
 | 
						|
	cloudProvider,
 | 
						|
	certDirectory,
 | 
						|
	rootDirectory string) (k kubelet.Bootstrap, err error) {
 | 
						|
 | 
						|
	// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
 | 
						|
	// up into "per source" synchronizations
 | 
						|
 | 
						|
	k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, crOptions, hostnameOverride, nodeIP, providerID, cloudProvider, certDirectory, rootDirectory)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	k.BirthCry()
 | 
						|
 | 
						|
	k.StartGarbageCollection()
 | 
						|
 | 
						|
	return k, nil
 | 
						|
}
 | 
						|
 | 
						|
// parseResourceList parses the given configuration map into an API
 | 
						|
// ResourceList or returns an error.
 | 
						|
func parseResourceList(m componentconfig.ConfigurationMap) (v1.ResourceList, error) {
 | 
						|
	if len(m) == 0 {
 | 
						|
		return nil, nil
 | 
						|
	}
 | 
						|
	rl := make(v1.ResourceList)
 | 
						|
	for k, v := range m {
 | 
						|
		switch v1.ResourceName(k) {
 | 
						|
		// CPU, memory and local storage resources are supported.
 | 
						|
		case v1.ResourceCPU, v1.ResourceMemory, v1.ResourceStorage:
 | 
						|
			q, err := resource.ParseQuantity(v)
 | 
						|
			if err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
			if q.Sign() == -1 {
 | 
						|
				return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
 | 
						|
			}
 | 
						|
			// storage specified in configuration map is mapped to ResourceStorageScratch API
 | 
						|
			if v1.ResourceName(k) == v1.ResourceStorage {
 | 
						|
				rl[v1.ResourceStorageScratch] = q
 | 
						|
			} else {
 | 
						|
				rl[v1.ResourceName(k)] = q
 | 
						|
			}
 | 
						|
		default:
 | 
						|
			return nil, fmt.Errorf("cannot reserve %q resource", k)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return rl, nil
 | 
						|
}
 | 
						|
 | 
						|
// BootstrapKubeletConfigController constructs and bootstrap a configuration controller
 | 
						|
func BootstrapKubeletConfigController(flags *options.KubeletFlags,
 | 
						|
	defaultConfig *componentconfig.KubeletConfiguration) (*componentconfig.KubeletConfiguration, *kubeletconfig.Controller, error) {
 | 
						|
	var err error
 | 
						|
	// Alpha Dynamic Configuration Implementation; this section only loads config from disk, it does not contact the API server
 | 
						|
	// compute absolute paths based on current working dir
 | 
						|
	initConfigDir := ""
 | 
						|
	if flags.InitConfigDir.Provided() {
 | 
						|
		initConfigDir, err = filepath.Abs(flags.InitConfigDir.Value())
 | 
						|
		if err != nil {
 | 
						|
			return nil, nil, fmt.Errorf("failed to get absolute path for --init-config-dir")
 | 
						|
		}
 | 
						|
	}
 | 
						|
	dynamicConfigDir := ""
 | 
						|
	if flags.DynamicConfigDir.Provided() {
 | 
						|
		dynamicConfigDir, err = filepath.Abs(flags.DynamicConfigDir.Value())
 | 
						|
		if err != nil {
 | 
						|
			return nil, nil, fmt.Errorf("failed to get absolute path for --dynamic-config-dir")
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// get the latest KubeletConfiguration checkpoint from disk, or load the init or default config if no valid checkpoints exist
 | 
						|
	kubeletConfigController := kubeletconfig.NewController(initConfigDir, dynamicConfigDir, defaultConfig)
 | 
						|
	kubeletConfig, err := kubeletConfigController.Bootstrap()
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, fmt.Errorf("failed to determine a valid configuration, error: %v", err)
 | 
						|
	}
 | 
						|
	return kubeletConfig, kubeletConfigController, nil
 | 
						|
}
 | 
						|
 | 
						|
// RunDockershim only starts the dockershim in current process. This is only used for cri validate testing purpose
 | 
						|
// TODO(random-liu): Move this to a separate binary.
 | 
						|
func RunDockershim(c *componentconfig.KubeletConfiguration, r *options.ContainerRuntimeOptions) error {
 | 
						|
	// Create docker client.
 | 
						|
	dockerClient := libdocker.ConnectToDockerOrDie(r.DockerEndpoint, c.RuntimeRequestTimeout.Duration,
 | 
						|
		r.ImagePullProgressDeadline.Duration)
 | 
						|
 | 
						|
	// Initialize network plugin settings.
 | 
						|
	binDir := r.CNIBinDir
 | 
						|
	if binDir == "" {
 | 
						|
		binDir = r.NetworkPluginDir
 | 
						|
	}
 | 
						|
	nh := &kubelet.NoOpLegacyHost{}
 | 
						|
	pluginSettings := dockershim.NetworkPluginSettings{
 | 
						|
		HairpinMode:       componentconfig.HairpinMode(c.HairpinMode),
 | 
						|
		NonMasqueradeCIDR: c.NonMasqueradeCIDR,
 | 
						|
		PluginName:        r.NetworkPluginName,
 | 
						|
		PluginConfDir:     r.CNIConfDir,
 | 
						|
		PluginBinDir:      binDir,
 | 
						|
		MTU:               int(r.NetworkPluginMTU),
 | 
						|
		LegacyRuntimeHost: nh,
 | 
						|
	}
 | 
						|
 | 
						|
	// Initialize streaming configuration. (Not using TLS now)
 | 
						|
	streamingConfig := &streaming.Config{
 | 
						|
		// Use a relative redirect (no scheme or host).
 | 
						|
		BaseURL:                         &url.URL{Path: "/cri/"},
 | 
						|
		StreamIdleTimeout:               c.StreamingConnectionIdleTimeout.Duration,
 | 
						|
		StreamCreationTimeout:           streaming.DefaultConfig.StreamCreationTimeout,
 | 
						|
		SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
 | 
						|
		SupportedPortForwardProtocols:   streaming.DefaultConfig.SupportedPortForwardProtocols,
 | 
						|
	}
 | 
						|
 | 
						|
	ds, err := dockershim.NewDockerService(dockerClient, c.SeccompProfileRoot, r.PodSandboxImage,
 | 
						|
		streamingConfig, &pluginSettings, c.RuntimeCgroups, c.CgroupDriver, r.DockerExecHandlerName, r.DockershimRootDirectory,
 | 
						|
		r.DockerDisableSharedPID)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if err := ds.Start(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
 | 
						|
	server := dockerremote.NewDockerServer(c.RemoteRuntimeEndpoint, ds)
 | 
						|
	if err := server.Start(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	// Start the streaming server
 | 
						|
	addr := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
 | 
						|
	return http.ListenAndServe(addr, ds)
 | 
						|
}
 |