Fix the container bridge so that it can create cbr0

Fix the kubelet so that it tries to sync status, even if Docker is down
This commit is contained in:
Brendan Burns
2015-06-19 22:49:18 -07:00
committed by Dawn Chen
parent a29ade2f33
commit 192ffdfb25
6 changed files with 98 additions and 41 deletions

View File

@@ -19,6 +19,7 @@ package kubelet
import (
"bytes"
"net"
"os"
"os/exec"
"regexp"
@@ -27,8 +28,39 @@ import (
var cidrRegexp = regexp.MustCompile(`inet ([0-9a-fA-F.:]*/[0-9]*)`)
func createCBR0(wantCIDR *net.IPNet) error {
// recreate cbr0 with wantCIDR
if err := exec.Command("brctl", "addbr", "cbr0").Run(); err != nil {
glog.Error(err)
return err
}
if err := exec.Command("ip", "addr", "add", wantCIDR.String(), "dev", "cbr0").Run(); err != nil {
glog.Error(err)
return err
}
if err := exec.Command("ip", "link", "set", "dev", "cbr0", "up").Run(); err != nil {
glog.Error(err)
return err
}
// restart docker
if err := exec.Command("service", "docker", "restart").Run(); err != nil {
glog.Error(err)
// For now just log the error. The containerRuntime check will catch docker failures.
// TODO (dawnchen) figure out what we should do for rkt here.
}
glog.V(2).Info("Recreated cbr0 and restarted docker")
return nil
}
func ensureCbr0(wantCIDR *net.IPNet) error {
if !cbr0CidrCorrect(wantCIDR) {
exists, err := cbr0Exists()
if err != nil {
return err
}
if !exists {
glog.V(2).Infof("CBR0 doesn't exist, attempting to create it with range: %s", wantCIDR)
return createCBR0(wantCIDR)
} else if !cbr0CidrCorrect(wantCIDR) {
glog.V(2).Infof("Attempting to recreate cbr0 with address range: %s", wantCIDR)
// delete cbr0
@@ -40,30 +72,22 @@ func ensureCbr0(wantCIDR *net.IPNet) error {
glog.Error(err)
return err
}
// recreate cbr0 with wantCIDR
if err := exec.Command("brctl", "addbr", "cbr0").Run(); err != nil {
glog.Error(err)
return err
}
if err := exec.Command("ip", "addr", "add", wantCIDR.String(), "dev", "cbr0").Run(); err != nil {
glog.Error(err)
return err
}
if err := exec.Command("ip", "link", "set", "dev", "cbr0", "up").Run(); err != nil {
glog.Error(err)
return err
}
// restart docker
if err := exec.Command("service", "docker", "restart").Run(); err != nil {
glog.Error(err)
// For now just log the error. The containerRuntime check will catch docker failures.
// TODO (dawnchen) figure out what we should do for rkt here.
}
glog.V(2).Info("Recreated cbr0 and restarted docker")
return createCBR0(wantCIDR)
}
return nil
}
func cbr0Exists() (bool, error) {
_, err := os.Stat("/sys/class/net/cbr0")
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
return true, nil
}
func cbr0CidrCorrect(wantCIDR *net.IPNet) bool {
output, err := exec.Command("ip", "addr", "show", "cbr0").Output()
if err != nil {

View File

@@ -147,6 +147,7 @@ func NewMainKubelet(
dockerDaemonContainer string,
systemContainer string,
configureCBR0 bool,
podCIDR string,
pods int,
dockerExecHandler dockertools.ExecHandler) (*Kubelet, error) {
if rootDirectory == "" {
@@ -261,6 +262,7 @@ func NewMainKubelet(
cgroupRoot: cgroupRoot,
mounter: mounter,
configureCBR0: configureCBR0,
podCIDR: podCIDR,
pods: pods,
syncLoopMonitor: util.AtomicValue{},
}
@@ -318,6 +320,10 @@ func NewMainKubelet(
}
klet.containerManager = containerManager
// Start syncing node status immediately, this may set up things the runtime needs to run.
go klet.syncNodeStatus()
go klet.syncNetworkStatus()
// Wait for the runtime to be up with a timeout.
if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil {
return nil, fmt.Errorf("timed out waiting for %q to come up: %v", containerRuntime, err)
@@ -412,6 +418,9 @@ type Kubelet struct {
runtimeUpThreshold time.Duration
lastTimestampRuntimeUp time.Time
// Network Status information
networkConfigured bool
// Volume plugins.
volumePluginMgr volume.VolumePluginMgr
@@ -489,6 +498,7 @@ type Kubelet struct {
// Whether or not kubelet should take responsibility for keeping cbr0 in
// the correct state.
configureCBR0 bool
podCIDR string
// Number of Pods which can be run by this Kubelet
pods int
@@ -707,7 +717,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
}
go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)
go kl.syncNodeStatus()
// Run the system oom watcher forever.
kl.statusManager.Start()
kl.syncLoop(updates, kl)
@@ -1705,6 +1714,10 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandl
glog.Infof("Skipping pod synchronization, container runtime is not up.")
return
}
if !kl.networkConfigured {
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, network is not configured")
}
unsyncedPod := false
podSyncTypes := make(map[types.UID]SyncPodType)
select {
@@ -1892,6 +1905,22 @@ func (kl *Kubelet) recordNodeStatusEvent(event string) {
// Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
var oldNodeUnschedulable bool
func (kl *Kubelet) syncNetworkStatus() {
for {
networkConfigured := true
if kl.configureCBR0 {
if len(kl.podCIDR) == 0 {
networkConfigured = false
} else if err := kl.reconcileCBR0(kl.podCIDR); err != nil {
networkConfigured = false
glog.Errorf("Error configuring cbr0: %v", err)
}
}
kl.networkConfigured = networkConfigured
time.Sleep(30 * time.Second)
}
}
// setNodeStatus fills in the Status fields of the given Node, overwriting
// any fields that are currently set.
func (kl *Kubelet) setNodeStatus(node *api.Node) error {
@@ -1925,16 +1954,6 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
}
}
networkConfigured := true
if kl.configureCBR0 {
if len(node.Spec.PodCIDR) == 0 {
networkConfigured = false
} else if err := kl.reconcileCBR0(node.Spec.PodCIDR); err != nil {
networkConfigured = false
glog.Errorf("Error configuring cbr0: %v", err)
}
}
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetCachedMachineInfo()
@@ -1982,7 +2001,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
currentTime := util.Now()
var newNodeReadyCondition api.NodeCondition
var oldNodeReadyConditionStatus api.ConditionStatus
if containerRuntimeUp && networkConfigured {
if containerRuntimeUp && kl.networkConfigured {
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
@@ -1994,7 +2013,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
if !containerRuntimeUp {
reasons = append(reasons, "container runtime is down")
}
if !networkConfigured {
if !kl.networkConfigured {
reasons = append(reasons, "network not configured correctly")
}
newNodeReadyCondition = api.NodeCondition{
@@ -2056,6 +2075,8 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
if node == nil {
return fmt.Errorf("no node instance returned for %q", kl.nodeName)
}
kl.podCIDR = node.Spec.PodCIDR
if err := kl.setNodeStatus(node); err != nil {
return err
}

View File

@@ -17,9 +17,11 @@ limitations under the License.
package kubelet
import (
"errors"
"fmt"
"reflect"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
@@ -58,6 +60,8 @@ func (s *statusManager) Start() {
err := s.syncBatch()
if err != nil {
glog.Warningf("Failed to updated pod status: %v", err)
// Errors and tight-looping are bad, m-kay
time.Sleep(30 * time.Second)
}
}, 0)
}
@@ -124,6 +128,9 @@ func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) {
// syncBatch syncs pods statuses with the apiserver.
func (s *statusManager) syncBatch() error {
if s.kubeClient == nil {
return errors.New("Kubernetes client is nil, skipping pod status updates")
}
syncRequest := <-s.podStatusChannel
pod := syncRequest.pod
podFullName := kubecontainer.GetPodFullName(pod)