From 6ddfa512def149ed110490ee2ea161b60b39b392 Mon Sep 17 00:00:00 2001 From: Dawn Chen Date: Wed, 24 Jun 2015 11:10:10 -0700 Subject: [PATCH 1/2] Revert "Revert "Fix the race between configuring cbr0 and restarting static pods"" This reverts commit fd0a95dd12d0c09e61a09b8d73de1a7799a0246c. --- cluster/saltbase/salt/docker/docker-defaults | 2 +- cluster/saltbase/salt/docker/init.sls | 6 -- .../saltbase/salt/kube-master-addons/init.sls | 16 ++++ cluster/saltbase/salt/kubelet/default | 7 +- cmd/kubelet/app/server.go | 6 +- contrib/mesos/pkg/executor/service/service.go | 1 + pkg/kubelet/container_bridge.go | 75 ++++++++++++++----- pkg/kubelet/kubelet.go | 55 +++++++++++--- pkg/kubelet/kubelet_test.go | 1 + pkg/kubelet/status_manager.go | 4 + pkg/util/util.go | 14 ++++ 11 files changed, 147 insertions(+), 40 deletions(-) diff --git a/cluster/saltbase/salt/docker/docker-defaults b/cluster/saltbase/salt/docker/docker-defaults index a8ec4256d5a..f325b4945d5 100644 --- a/cluster/saltbase/salt/docker/docker-defaults +++ b/cluster/saltbase/salt/docker/docker-defaults @@ -2,5 +2,5 @@ DOCKER_OPTS="" {% if grains.docker_opts is defined and grains.docker_opts %} DOCKER_OPTS="${DOCKER_OPTS} {{grains.docker_opts}}" {% endif %} -DOCKER_OPTS="${DOCKER_OPTS} --bridge cbr0 --iptables=false --ip-masq=false" +DOCKER_OPTS="${DOCKER_OPTS} --bridge=cbr0 --iptables=false --ip-masq=false" DOCKER_NOFILE=1000000 diff --git a/cluster/saltbase/salt/docker/init.sls b/cluster/saltbase/salt/docker/init.sls index 9728ed57293..06d37e26f27 100644 --- a/cluster/saltbase/salt/docker/init.sls +++ b/cluster/saltbase/salt/docker/init.sls @@ -48,11 +48,6 @@ net.ipv4.ip_forward: sysctl.present: - value: 1 -cbr0: - container_bridge.ensure: - - cidr: {{ grains['cbr-cidr'] }} - - mtu: 1460 - {{ environment_file }}: file.managed: - source: salt://docker/docker-defaults @@ -124,7 +119,6 @@ docker: - enable: True - watch: - file: {{ environment_file }} - - container_bridge: cbr0 {% if override_docker_ver != '' %} - require: - pkg: lxc-docker-{{ override_docker_ver }} diff --git a/cluster/saltbase/salt/kube-master-addons/init.sls b/cluster/saltbase/salt/kube-master-addons/init.sls index 91186052205..075a7a05113 100644 --- a/cluster/saltbase/salt/kube-master-addons/init.sls +++ b/cluster/saltbase/salt/kube-master-addons/init.sls @@ -37,10 +37,26 @@ master-docker-image-tags: file.touch: - name: /srv/pillar/docker-images.sls +# Current containervm image by default has both docker and kubelet +# running. But during cluster creation stage, docker and kubelet +# could be overwritten completely, or restarted due to flag changes. +# The ordering of salt states for service docker, kubelet and +# master-addon below is very important to avoid the race between +# salt restart docker or kubelet and kubelet start master components. +# Without the ordering of salt states, when gce instance boot up, +# configure-vm.sh will run and download the release. At the end of +# boot, run-salt will run kube-master-addons service which installs +# master component manifest files to kubelet config directory before +# the installation of proper version kubelet. Please see +# https://github.com/GoogleCloudPlatform/kubernetes/issues/10122#issuecomment-114566063 +# for detail explanation on this very issue. kube-master-addons: service.running: - enable: True - restart: True + - require: + - service: docker + - service: kubelet - watch: - file: master-docker-image-tags - file: /etc/kubernetes/kube-master-addons.sh diff --git a/cluster/saltbase/salt/kubelet/default b/cluster/saltbase/salt/kubelet/default index 76eb4497b4d..a1c1dccc1f0 100644 --- a/cluster/saltbase/salt/kubelet/default +++ b/cluster/saltbase/salt/kubelet/default @@ -76,4 +76,9 @@ {% set cgroup_root = "--cgroup_root=/" -%} {% endif -%} -DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{debugging_handlers}} {{hostname_override}} {{cloud_provider}} {{config}} --allow_privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{kubelet_root}} {{configure_cbr0}} {{cgroup_root}} {{system_container}}" +{% set pod_cidr = "" %} +{% if grains['roles'][0] == 'kubernetes-master' %} + {% set pod_cidr = "--pod-cidr=" + grains['cbr-cidr'] %} +{% endif %} + +DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{debugging_handlers}} {{hostname_override}} {{cloud_provider}} {{config}} --allow_privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{kubelet_root}} {{configure_cbr0}} {{cgroup_root}} {{system_container}} {{pod_cidr}}" diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index b50179b4807..1cd1362da1f 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -115,6 +115,7 @@ type KubeletServer struct { DockerDaemonContainer string SystemContainer string ConfigureCBR0 bool + PodCIDR string MaxPods int DockerExecHandlerName string @@ -241,7 +242,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.ConfigureCBR0, "configure-cbr0", s.ConfigureCBR0, "If true, kubelet will configure cbr0 based on Node.Spec.PodCIDR.") fs.IntVar(&s.MaxPods, "max-pods", 100, "Number of Pods that can run on this Kubelet.") fs.StringVar(&s.DockerExecHandlerName, "docker-exec-handler", s.DockerExecHandlerName, "Handler to use when executing a command in a container. Valid values are 'native' and 'nsenter'. Defaults to 'native'.") - + fs.StringVar(&s.PodCIDR, "pod-cidr", "", "The CIDR to use for pod IP addresses, only used in standalone mode. In cluster mode, this is obtained from the master.") // Flags intended for testing, not recommended used in production environments. fs.BoolVar(&s.ReallyCrashForTesting, "really-crash-for-testing", s.ReallyCrashForTesting, "If true, when panics occur crash. Intended for testing.") fs.Float64Var(&s.ChaosChance, "chaos-chance", s.ChaosChance, "If > 0.0, introduce random client errors and latency. Intended for testing. [default=0.0]") @@ -361,6 +362,7 @@ func (s *KubeletServer) Run(_ []string) error { DockerDaemonContainer: s.DockerDaemonContainer, SystemContainer: s.SystemContainer, ConfigureCBR0: s.ConfigureCBR0, + PodCIDR: s.PodCIDR, MaxPods: s.MaxPods, DockerExecHandler: dockerExecHandler, } @@ -714,6 +716,7 @@ type KubeletConfig struct { DockerDaemonContainer string SystemContainer string ConfigureCBR0 bool + PodCIDR string MaxPods int DockerExecHandler dockertools.ExecHandler } @@ -771,6 +774,7 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod kc.DockerDaemonContainer, kc.SystemContainer, kc.ConfigureCBR0, + kc.PodCIDR, kc.MaxPods, kc.DockerExecHandler) diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index dd3b6605b9e..06ffbd6255d 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -354,6 +354,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( kc.DockerDaemonContainer, kc.SystemContainer, kc.ConfigureCBR0, + kc.PodCIDR, kc.MaxPods, kc.DockerExecHandler, ) diff --git a/pkg/kubelet/container_bridge.go b/pkg/kubelet/container_bridge.go index a4a30a53015..4ef58f4aaee 100644 --- a/pkg/kubelet/container_bridge.go +++ b/pkg/kubelet/container_bridge.go @@ -19,15 +19,55 @@ package kubelet import ( "bytes" "net" + "os" "os/exec" "regexp" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) var cidrRegexp = regexp.MustCompile(`inet ([0-9a-fA-F.:]*/[0-9]*)`) +func createCBR0(wantCIDR *net.IPNet) error { + // recreate cbr0 with wantCIDR + if err := exec.Command("brctl", "addbr", "cbr0").Run(); err != nil { + glog.Error(err) + return err + } + if err := exec.Command("ip", "addr", "add", wantCIDR.String(), "dev", "cbr0").Run(); err != nil { + glog.Error(err) + return err + } + if err := exec.Command("ip", "link", "set", "dev", "cbr0", "up").Run(); err != nil { + glog.Error(err) + return err + } + // restart docker + // For now just log the error. The containerRuntime check will catch docker failures. + // TODO (dawnchen) figure out what we should do for rkt here. + if util.UsingSystemdInitSystem() { + if err := exec.Command("systemctl", "restart", "docker").Run(); err != nil { + glog.Error(err) + } + } else { + if err := exec.Command("service", "docker", "restart").Run(); err != nil { + glog.Error(err) + } + } + glog.V(2).Info("Recreated cbr0 and restarted docker") + return nil +} + func ensureCbr0(wantCIDR *net.IPNet) error { + exists, err := cbr0Exists() + if err != nil { + return err + } + if !exists { + glog.V(2).Infof("CBR0 doesn't exist, attempting to create it with range: %s", wantCIDR) + return createCBR0(wantCIDR) + } if !cbr0CidrCorrect(wantCIDR) { glog.V(2).Infof("Attempting to recreate cbr0 with address range: %s", wantCIDR) @@ -40,30 +80,24 @@ func ensureCbr0(wantCIDR *net.IPNet) error { glog.Error(err) return err } - // recreate cbr0 with wantCIDR - if err := exec.Command("brctl", "addbr", "cbr0").Run(); err != nil { - glog.Error(err) - return err - } - if err := exec.Command("ip", "addr", "add", wantCIDR.String(), "dev", "cbr0").Run(); err != nil { - glog.Error(err) - return err - } - if err := exec.Command("ip", "link", "set", "dev", "cbr0", "up").Run(); err != nil { - glog.Error(err) - return err - } - // restart docker - if err := exec.Command("service", "docker", "restart").Run(); err != nil { - glog.Error(err) - // For now just log the error. The containerRuntime check will catch docker failures. - // TODO (dawnchen) figure out what we should do for rkt here. - } - glog.V(2).Info("Recreated cbr0 and restarted docker") + return createCBR0(wantCIDR) } return nil } +// Check if cbr0 network interface is configured or not, and take action +// when the configuration is missing on the node, and propagate the rest +// error to kubelet to handle. +func cbr0Exists() (bool, error) { + if _, err := os.Stat("/sys/class/net/cbr0"); err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + return true, nil +} + func cbr0CidrCorrect(wantCIDR *net.IPNet) bool { output, err := exec.Command("ip", "addr", "show", "cbr0").Output() if err != nil { @@ -79,6 +113,7 @@ func cbr0CidrCorrect(wantCIDR *net.IPNet) bool { return false } cbr0CIDR.IP = cbr0IP + glog.V(5).Infof("Want cbr0 CIDR: %s, have cbr0 CIDR: %s", wantCIDR, cbr0CIDR) return wantCIDR.IP.Equal(cbr0IP) && bytes.Equal(wantCIDR.Mask, cbr0CIDR.Mask) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 47e86f6c9c2..93bde502005 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -147,6 +147,7 @@ func NewMainKubelet( dockerDaemonContainer string, systemContainer string, configureCBR0 bool, + podCIDR string, pods int, dockerExecHandler dockertools.ExecHandler) (*Kubelet, error) { if rootDirectory == "" { @@ -261,6 +262,7 @@ func NewMainKubelet( cgroupRoot: cgroupRoot, mounter: mounter, configureCBR0: configureCBR0, + podCIDR: podCIDR, pods: pods, syncLoopMonitor: util.AtomicValue{}, } @@ -318,6 +320,10 @@ func NewMainKubelet( } klet.containerManager = containerManager + // Start syncing node status immediately, this may set up things the runtime needs to run. + go util.Until(klet.syncNetworkStatus, 30*time.Second, util.NeverStop) + go klet.syncNodeStatus() + // Wait for the runtime to be up with a timeout. if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil { return nil, fmt.Errorf("timed out waiting for %q to come up: %v", containerRuntime, err) @@ -412,6 +418,10 @@ type Kubelet struct { runtimeUpThreshold time.Duration lastTimestampRuntimeUp time.Time + // Network Status information + networkConfigMutex sync.Mutex + networkConfigured bool + // Volume plugins. volumePluginMgr volume.VolumePluginMgr @@ -489,6 +499,7 @@ type Kubelet struct { // Whether or not kubelet should take responsibility for keeping cbr0 in // the correct state. configureCBR0 bool + podCIDR string // Number of Pods which can be run by this Kubelet pods int @@ -707,7 +718,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { } go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop) - go kl.syncNodeStatus() + // Run the system oom watcher forever. kl.statusManager.Start() kl.syncLoop(updates, kl) @@ -1705,6 +1716,11 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandl glog.Infof("Skipping pod synchronization, container runtime is not up.") return } + if !kl.doneNetworkConfigure() { + time.Sleep(5 * time.Second) + glog.Infof("Skipping pod synchronization, network is not configured") + return + } unsyncedPod := false podSyncTypes := make(map[types.UID]SyncPodType) select { @@ -1861,6 +1877,7 @@ func (kl *Kubelet) reconcileCBR0(podCIDR string) error { glog.V(5).Info("PodCIDR not set. Will not configure cbr0.") return nil } + glog.V(5).Infof("PodCIDR is set to %q", podCIDR) _, cidr, err := net.ParseCIDR(podCIDR) if err != nil { return err @@ -1895,6 +1912,22 @@ func (kl *Kubelet) recordNodeStatusEvent(event string) { // Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus() var oldNodeUnschedulable bool +func (kl *Kubelet) syncNetworkStatus() { + kl.networkConfigMutex.Lock() + defer kl.networkConfigMutex.Unlock() + + networkConfigured := true + if kl.configureCBR0 { + if len(kl.podCIDR) == 0 { + networkConfigured = false + } else if err := kl.reconcileCBR0(kl.podCIDR); err != nil { + networkConfigured = false + glog.Errorf("Error configuring cbr0: %v", err) + } + } + kl.networkConfigured = networkConfigured +} + // setNodeStatus fills in the Status fields of the given Node, overwriting // any fields that are currently set. func (kl *Kubelet) setNodeStatus(node *api.Node) error { @@ -1928,16 +1961,6 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { } } - networkConfigured := true - if kl.configureCBR0 { - if len(node.Spec.PodCIDR) == 0 { - networkConfigured = false - } else if err := kl.reconcileCBR0(node.Spec.PodCIDR); err != nil { - networkConfigured = false - glog.Errorf("Error configuring cbr0: %v", err) - } - } - // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. info, err := kl.GetCachedMachineInfo() @@ -1981,6 +2004,8 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { // Check whether container runtime can be reported as up. containerRuntimeUp := kl.containerRuntimeUp() + // Check whether network is configured properly + networkConfigured := kl.doneNetworkConfigure() currentTime := util.Now() var newNodeReadyCondition api.NodeCondition @@ -2049,6 +2074,12 @@ func (kl *Kubelet) containerRuntimeUp() bool { return kl.lastTimestampRuntimeUp.Add(kl.runtimeUpThreshold).After(time.Now()) } +func (kl *Kubelet) doneNetworkConfigure() bool { + kl.networkConfigMutex.Lock() + defer kl.networkConfigMutex.Unlock() + return kl.networkConfigured +} + // tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 // is set, this function will also confirm that cbr0 is configured correctly. func (kl *Kubelet) tryUpdateNodeStatus() error { @@ -2059,6 +2090,8 @@ func (kl *Kubelet) tryUpdateNodeStatus() error { if node == nil { return fmt.Errorf("no node instance returned for %q", kl.nodeName) } + kl.podCIDR = node.Spec.PodCIDR + if err := kl.setNodeStatus(node); err != nil { return err } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 96183eda521..4957f57906d 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -127,6 +127,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { } kubelet.volumeManager = newVolumeManager() kubelet.containerManager, _ = newContainerManager(mockCadvisor, "", "", "") + kubelet.networkConfigured = true return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} } diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index 89e9081a10c..dd28f7067ca 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -17,6 +17,7 @@ limitations under the License. package kubelet import ( + "errors" "fmt" "reflect" "sort" @@ -142,6 +143,9 @@ func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) { // syncBatch syncs pods statuses with the apiserver. func (s *statusManager) syncBatch() error { + if s.kubeClient == nil { + return errors.New("Kubernetes client is nil, skipping pod status updates") + } syncRequest := <-s.podStatusChannel pod := syncRequest.pod podFullName := kubecontainer.GetPodFullName(pod) diff --git a/pkg/util/util.go b/pkg/util/util.go index 17f56187ea4..fc335b95f8a 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -198,6 +198,20 @@ func CompileRegexps(regexpStrings []string) ([]*regexp.Regexp, error) { return regexps, nil } +// Detects if using systemd as the init system +// Please note that simply reading /proc/1/cmdline can be misleading because +// some installation of various init programs can automatically make /sbin/init +// a symlink or even a renamed version of their main program. +// TODO(dchen1107): realiably detects the init system using on the system: +// systemd, upstart, initd, etc. +func UsingSystemdInitSystem() bool { + if _, err := os.Stat("/run/systemd/system"); err != nil { + return true + } + + return false +} + // Writes 'value' to /proc//oom_score_adj. PID = 0 means self func ApplyOomScoreAdj(pid int, value int) error { if value < -1000 || value > 1000 { From 710fb4e4136678052629ac423c52e6615bef0e2e Mon Sep 17 00:00:00 2001 From: Dawn Chen Date: Wed, 24 Jun 2015 12:56:36 -0700 Subject: [PATCH 2/2] add iptables rule for MASQUERADE for egress --- pkg/kubelet/container_bridge.go | 17 ++++++++++++++++- pkg/kubelet/kubelet.go | 4 ++++ pkg/kubelet/status_manager.go | 4 ++-- pkg/util/util.go | 2 +- 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/pkg/kubelet/container_bridge.go b/pkg/kubelet/container_bridge.go index 4ef58f4aaee..4155b3ffd44 100644 --- a/pkg/kubelet/container_bridge.go +++ b/pkg/kubelet/container_bridge.go @@ -39,7 +39,7 @@ func createCBR0(wantCIDR *net.IPNet) error { glog.Error(err) return err } - if err := exec.Command("ip", "link", "set", "dev", "cbr0", "up").Run(); err != nil { + if err := exec.Command("ip", "link", "set", "dev", "cbr0", "mtu", "1460", "up").Run(); err != nil { glog.Error(err) return err } @@ -117,3 +117,18 @@ func cbr0CidrCorrect(wantCIDR *net.IPNet) bool { glog.V(5).Infof("Want cbr0 CIDR: %s, have cbr0 CIDR: %s", wantCIDR, cbr0CIDR) return wantCIDR.IP.Equal(cbr0IP) && bytes.Equal(wantCIDR.Mask, cbr0CIDR.Mask) } + +// TODO(dawnchen): Using pkg/util/iptables +func ensureIPTablesMasqRule() error { + // Check if the MASQUERADE rule exist or not + if err := exec.Command("iptables", "-t", "nat", "-C", "POSTROUTING", "-o", "eth0", "-j", "MASQUERADE", "!", "-d", "10.0.0.0/8").Run(); err == nil { + // The MASQUERADE rule exists + return nil + } + + glog.Infof("MASQUERADE rule doesn't exist, recreate it") + if err := exec.Command("iptables", "-t", "nat", "-A", "POSTROUTING", "-o", "eth0", "-j", "MASQUERADE", "!", "-d", "10.0.0.0/8").Run(); err != nil { + return err + } + return nil +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 93bde502005..4b9e91890eb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1918,6 +1918,10 @@ func (kl *Kubelet) syncNetworkStatus() { networkConfigured := true if kl.configureCBR0 { + if err := ensureIPTablesMasqRule(); err != nil { + networkConfigured = false + glog.Errorf("Error on adding ip table rules: %v", err) + } if len(kl.podCIDR) == 0 { networkConfigured = false } else if err := kl.reconcileCBR0(kl.podCIDR); err != nil { diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index dd28f7067ca..d30f7bf30bc 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -17,7 +17,6 @@ limitations under the License. package kubelet import ( - "errors" "fmt" "reflect" "sort" @@ -144,7 +143,8 @@ func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) { // syncBatch syncs pods statuses with the apiserver. func (s *statusManager) syncBatch() error { if s.kubeClient == nil { - return errors.New("Kubernetes client is nil, skipping pod status updates") + glog.V(4).Infof("Kubernetes client is nil, skipping pod status updates") + return nil } syncRequest := <-s.podStatusChannel pod := syncRequest.pod diff --git a/pkg/util/util.go b/pkg/util/util.go index fc335b95f8a..36aee047e91 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -205,7 +205,7 @@ func CompileRegexps(regexpStrings []string) ([]*regexp.Regexp, error) { // TODO(dchen1107): realiably detects the init system using on the system: // systemd, upstart, initd, etc. func UsingSystemdInitSystem() bool { - if _, err := os.Stat("/run/systemd/system"); err != nil { + if _, err := os.Stat("/run/systemd/system"); err == nil { return true }