Fix several issues on running syncPods until network is configured.

Also fixed unittests and compiling.
This commit is contained in:
Dawn Chen 2015-06-22 23:07:40 -07:00
parent 192ffdfb25
commit 23200d303f
7 changed files with 59 additions and 27 deletions

View File

@ -2,6 +2,5 @@ DOCKER_OPTS=""
{% if grains.docker_opts is defined and grains.docker_opts %} {% if grains.docker_opts is defined and grains.docker_opts %}
DOCKER_OPTS="${DOCKER_OPTS} {{grains.docker_opts}}" DOCKER_OPTS="${DOCKER_OPTS} {{grains.docker_opts}}"
{% endif %} {% endif %}
DOCKER_OPTS="${DOCKER_OPTS} --bridge=cbr0 --iptables=false --ip-masq=false" DOCKER_OPTS="${DOCKER_OPTS} --bridge=cbr0 --iptables=false --ip-masq=false"
DOCKER_NOFILE=1000000 DOCKER_NOFILE=1000000

View File

@ -354,6 +354,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
kc.DockerDaemonContainer, kc.DockerDaemonContainer,
kc.SystemContainer, kc.SystemContainer,
kc.ConfigureCBR0, kc.ConfigureCBR0,
kc.PodCIDR,
kc.MaxPods, kc.MaxPods,
kc.DockerExecHandler, kc.DockerExecHandler,
) )

View File

@ -23,6 +23,7 @@ import (
"os/exec" "os/exec"
"regexp" "regexp"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -43,10 +44,16 @@ func createCBR0(wantCIDR *net.IPNet) error {
return err return err
} }
// restart docker // restart docker
if err := exec.Command("service", "docker", "restart").Run(); err != nil { // For now just log the error. The containerRuntime check will catch docker failures.
glog.Error(err) // TODO (dawnchen) figure out what we should do for rkt here.
// For now just log the error. The containerRuntime check will catch docker failures. if util.UsingSystemdInitSystem() {
// TODO (dawnchen) figure out what we should do for rkt here. if err := exec.Command("systemctl", "restart", "docker").Run(); err != nil {
glog.Error(err)
}
} else {
if err := exec.Command("service", "docker", "restart").Run(); err != nil {
glog.Error(err)
}
} }
glog.V(2).Info("Recreated cbr0 and restarted docker") glog.V(2).Info("Recreated cbr0 and restarted docker")
return nil return nil
@ -60,7 +67,8 @@ func ensureCbr0(wantCIDR *net.IPNet) error {
if !exists { if !exists {
glog.V(2).Infof("CBR0 doesn't exist, attempting to create it with range: %s", wantCIDR) glog.V(2).Infof("CBR0 doesn't exist, attempting to create it with range: %s", wantCIDR)
return createCBR0(wantCIDR) return createCBR0(wantCIDR)
} else if !cbr0CidrCorrect(wantCIDR) { }
if !cbr0CidrCorrect(wantCIDR) {
glog.V(2).Infof("Attempting to recreate cbr0 with address range: %s", wantCIDR) glog.V(2).Infof("Attempting to recreate cbr0 with address range: %s", wantCIDR)
// delete cbr0 // delete cbr0
@ -78,8 +86,7 @@ func ensureCbr0(wantCIDR *net.IPNet) error {
} }
func cbr0Exists() (bool, error) { func cbr0Exists() (bool, error) {
_, err := os.Stat("/sys/class/net/cbr0") if _, err := os.Stat("/sys/class/net/cbr0"); err != nil {
if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return false, nil return false, nil
} }
@ -103,6 +110,7 @@ func cbr0CidrCorrect(wantCIDR *net.IPNet) bool {
return false return false
} }
cbr0CIDR.IP = cbr0IP cbr0CIDR.IP = cbr0IP
glog.V(5).Infof("Want cbr0 CIDR: %s, have cbr0 CIDR: %s", wantCIDR, cbr0CIDR) glog.V(5).Infof("Want cbr0 CIDR: %s, have cbr0 CIDR: %s", wantCIDR, cbr0CIDR)
return wantCIDR.IP.Equal(cbr0IP) && bytes.Equal(wantCIDR.Mask, cbr0CIDR.Mask) return wantCIDR.IP.Equal(cbr0IP) && bytes.Equal(wantCIDR.Mask, cbr0CIDR.Mask)
} }

View File

@ -321,8 +321,8 @@ func NewMainKubelet(
klet.containerManager = containerManager klet.containerManager = containerManager
// Start syncing node status immediately, this may set up things the runtime needs to run. // Start syncing node status immediately, this may set up things the runtime needs to run.
go util.Until(klet.syncNetworkStatus, 30*time.Second, util.NeverStop)
go klet.syncNodeStatus() go klet.syncNodeStatus()
go klet.syncNetworkStatus()
// Wait for the runtime to be up with a timeout. // Wait for the runtime to be up with a timeout.
if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil { if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil {
@ -419,7 +419,8 @@ type Kubelet struct {
lastTimestampRuntimeUp time.Time lastTimestampRuntimeUp time.Time
// Network Status information // Network Status information
networkConfigured bool networkConfigMutex sync.Mutex
networkConfigured bool
// Volume plugins. // Volume plugins.
volumePluginMgr volume.VolumePluginMgr volumePluginMgr volume.VolumePluginMgr
@ -717,6 +718,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
} }
go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop) go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)
// Run the system oom watcher forever. // Run the system oom watcher forever.
kl.statusManager.Start() kl.statusManager.Start()
kl.syncLoop(updates, kl) kl.syncLoop(updates, kl)
@ -1714,9 +1716,10 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandl
glog.Infof("Skipping pod synchronization, container runtime is not up.") glog.Infof("Skipping pod synchronization, container runtime is not up.")
return return
} }
if !kl.networkConfigured { if !kl.doneNetworkConfigure() {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, network is not configured") glog.Infof("Skipping pod synchronization, network is not configured")
return
} }
unsyncedPod := false unsyncedPod := false
podSyncTypes := make(map[types.UID]SyncPodType) podSyncTypes := make(map[types.UID]SyncPodType)
@ -1871,6 +1874,7 @@ func (kl *Kubelet) reconcileCBR0(podCIDR string) error {
glog.V(5).Info("PodCIDR not set. Will not configure cbr0.") glog.V(5).Info("PodCIDR not set. Will not configure cbr0.")
return nil return nil
} }
glog.V(5).Infof("PodCIDR is set to %q", podCIDR)
_, cidr, err := net.ParseCIDR(podCIDR) _, cidr, err := net.ParseCIDR(podCIDR)
if err != nil { if err != nil {
return err return err
@ -1906,19 +1910,19 @@ func (kl *Kubelet) recordNodeStatusEvent(event string) {
var oldNodeUnschedulable bool var oldNodeUnschedulable bool
func (kl *Kubelet) syncNetworkStatus() { func (kl *Kubelet) syncNetworkStatus() {
for { kl.networkConfigMutex.Lock()
networkConfigured := true defer kl.networkConfigMutex.Unlock()
if kl.configureCBR0 {
if len(kl.podCIDR) == 0 { networkConfigured := true
networkConfigured = false if kl.configureCBR0 {
} else if err := kl.reconcileCBR0(kl.podCIDR); err != nil { if len(kl.podCIDR) == 0 {
networkConfigured = false networkConfigured = false
glog.Errorf("Error configuring cbr0: %v", err) } else if err := kl.reconcileCBR0(kl.podCIDR); err != nil {
} networkConfigured = false
glog.Errorf("Error configuring cbr0: %v", err)
} }
kl.networkConfigured = networkConfigured
time.Sleep(30 * time.Second)
} }
kl.networkConfigured = networkConfigured
} }
// setNodeStatus fills in the Status fields of the given Node, overwriting // setNodeStatus fills in the Status fields of the given Node, overwriting
@ -1997,11 +2001,13 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
// Check whether container runtime can be reported as up. // Check whether container runtime can be reported as up.
containerRuntimeUp := kl.containerRuntimeUp() containerRuntimeUp := kl.containerRuntimeUp()
// Check whether network is configured properly
networkConfigured := kl.doneNetworkConfigure()
currentTime := util.Now() currentTime := util.Now()
var newNodeReadyCondition api.NodeCondition var newNodeReadyCondition api.NodeCondition
var oldNodeReadyConditionStatus api.ConditionStatus var oldNodeReadyConditionStatus api.ConditionStatus
if containerRuntimeUp && kl.networkConfigured { if containerRuntimeUp && networkConfigured {
newNodeReadyCondition = api.NodeCondition{ newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionTrue, Status: api.ConditionTrue,
@ -2013,7 +2019,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
if !containerRuntimeUp { if !containerRuntimeUp {
reasons = append(reasons, "container runtime is down") reasons = append(reasons, "container runtime is down")
} }
if !kl.networkConfigured { if !networkConfigured {
reasons = append(reasons, "network not configured correctly") reasons = append(reasons, "network not configured correctly")
} }
newNodeReadyCondition = api.NodeCondition{ newNodeReadyCondition = api.NodeCondition{
@ -2065,6 +2071,12 @@ func (kl *Kubelet) containerRuntimeUp() bool {
return kl.lastTimestampRuntimeUp.Add(kl.runtimeUpThreshold).After(time.Now()) return kl.lastTimestampRuntimeUp.Add(kl.runtimeUpThreshold).After(time.Now())
} }
func (kl *Kubelet) doneNetworkConfigure() bool {
kl.networkConfigMutex.Lock()
defer kl.networkConfigMutex.Unlock()
return kl.networkConfigured
}
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 // tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly. // is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error { func (kl *Kubelet) tryUpdateNodeStatus() error {

View File

@ -127,6 +127,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
} }
kubelet.volumeManager = newVolumeManager() kubelet.volumeManager = newVolumeManager()
kubelet.containerManager, _ = newContainerManager(mockCadvisor, "", "", "") kubelet.containerManager, _ = newContainerManager(mockCadvisor, "", "", "")
kubelet.networkConfigured = true
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient}
} }

View File

@ -21,7 +21,6 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
@ -60,8 +59,6 @@ func (s *statusManager) Start() {
err := s.syncBatch() err := s.syncBatch()
if err != nil { if err != nil {
glog.Warningf("Failed to updated pod status: %v", err) glog.Warningf("Failed to updated pod status: %v", err)
// Errors and tight-looping are bad, m-kay
time.Sleep(30 * time.Second)
} }
}, 0) }, 0)
} }

View File

@ -198,6 +198,20 @@ func CompileRegexps(regexpStrings []string) ([]*regexp.Regexp, error) {
return regexps, nil return regexps, nil
} }
// Detects if using systemd as the init system
// Please note that simply reading /proc/1/cmdline can be misleading because
// some installation of various init programs can automatically make /sbin/init
// a symlink or even a renamed version of their main program.
// TODO(dchen1107): realiably detects the init system using on the system:
// systemd, upstart, initd, etc.
func UsingSystemdInitSystem() bool {
if _, err := os.Stat("/run/systemd/system"); err != nil {
return true
}
return false
}
// Writes 'value' to /proc/<pid>/oom_score_adj. PID = 0 means self // Writes 'value' to /proc/<pid>/oom_score_adj. PID = 0 means self
func ApplyOomScoreAdj(pid int, value int) error { func ApplyOomScoreAdj(pid int, value int) error {
if value < -1000 || value > 1000 { if value < -1000 || value > 1000 {