Merge pull request #7984 from cjcullen/kubelet

Kubelet configure cbr0 instead of configure-vm.sh
This commit is contained in:
Dawn Chen 2015-05-13 17:32:52 -07:00
commit 309a157665
6 changed files with 151 additions and 51 deletions

View File

@ -64,14 +64,6 @@ for k,v in yaml.load(sys.stdin).iteritems():
else
KUBERNETES_MASTER="true"
fi
if [[ "${KUBERNETES_MASTER}" != "true" ]] && [[ -z "${MINION_IP_RANGE:-}" ]]; then
# This block of code should go away once the master can allocate CIDRs
until MINION_IP_RANGE=$(curl-metadata node-ip-range); do
echo 'Waiting for metadata MINION_IP_RANGE...'
sleep 3
done
fi
}
function remove-docker-artifacts() {
@ -411,7 +403,7 @@ function salt-node-role() {
grains:
roles:
- kubernetes-pool
cbr-cidr: '$(echo "$MINION_IP_RANGE" | sed -e "s/'/''/g")'
cbr-cidr: 10.123.45.0/30
cloud: gce
EOF
}

View File

@ -40,4 +40,9 @@
{% set docker_root = " --docker_root=" + grains.docker_root -%}
{% endif -%}
DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{hostname_override}} {{config}} --allow_privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}}"
{% set configure_cbr0 = "" -%}
{% if pillar['allocate_node_cidrs'] is defined -%}
{% set configure_cbr0 = "--configure-cbr0=" + pillar['allocate_node_cidrs'] -%}
{% endif -%}
DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{hostname_override}} {{config}} --allow_privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{configure_cbr0}}"

View File

@ -107,6 +107,7 @@ type KubeletServer struct {
CgroupRoot string
ContainerRuntime string
DockerDaemonContainer string
ConfigureCBR0 bool
// Flags intended for testing
@ -166,6 +167,7 @@ func NewKubeletServer() *KubeletServer {
CgroupRoot: "",
ContainerRuntime: "docker",
DockerDaemonContainer: "/docker-daemon",
ConfigureCBR0: false,
}
}
@ -223,6 +225,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.CgroupRoot, "cgroup_root", s.CgroupRoot, "Optional root cgroup to use for pods. This is handled by the container runtime on a best effort basis. Default: '', which means use the container runtime default.")
fs.StringVar(&s.ContainerRuntime, "container_runtime", s.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'rkt'. Default: 'docker'.")
fs.StringVar(&s.DockerDaemonContainer, "docker-daemon-container", s.DockerDaemonContainer, "Optional resource-only container in which to place the Docker Daemon. Empty for no container (Default: /docker-daemon).")
fs.BoolVar(&s.ConfigureCBR0, "configure-cbr0", s.ConfigureCBR0, "If true, kubelet will configure cbr0 based on Node.Spec.PodCIDR.")
// Flags intended for testing, not recommended used in production environments.
fs.BoolVar(&s.ReallyCrashForTesting, "really-crash-for-testing", s.ReallyCrashForTesting, "If true, when panics occur crash. Intended for testing.")
@ -338,6 +341,7 @@ func (s *KubeletServer) Run(_ []string) error {
ContainerRuntime: s.ContainerRuntime,
Mounter: mounter,
DockerDaemonContainer: s.DockerDaemonContainer,
ConfigureCBR0: s.ConfigureCBR0,
}
RunKubelet(&kcfg, nil)
@ -628,6 +632,7 @@ type KubeletConfig struct {
ContainerRuntime string
Mounter mount.Interface
DockerDaemonContainer string
ConfigureCBR0 bool
}
func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {
@ -677,7 +682,8 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.CgroupRoot,
kc.ContainerRuntime,
kc.Mounter,
kc.DockerDaemonContainer)
kc.DockerDaemonContainer,
kc.ConfigureCBR0)
if err != nil {
return nil, nil, err

View File

@ -17,7 +17,6 @@ limitations under the License.
package gce_cloud
import (
"errors"
"fmt"
"io"
"io/ioutil"
@ -43,10 +42,6 @@ import (
"google.golang.org/cloud/compute/metadata"
)
var ErrMetadataConflict = errors.New("Metadata already set at the same key")
const podCIDRMetadataKey string = "node-ip-range"
// GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine.
type GCECloud struct {
service *compute.Service
@ -562,44 +557,23 @@ func getMetadataValue(metadata *compute.Metadata, key string) (string, bool) {
func (gce *GCECloud) Configure(name string, spec *api.NodeSpec) error {
instanceName := canonicalizeInstanceName(name)
instance, err := gce.service.Instances.Get(gce.projectID, gce.zone, instanceName).Do()
if err != nil {
return err
}
if currentValue, ok := getMetadataValue(instance.Metadata, podCIDRMetadataKey); ok {
if currentValue == spec.PodCIDR {
// IP range already set to proper value.
return nil
}
return ErrMetadataConflict
}
// We are setting the metadata, so they can be picked-up by the configure-vm.sh script to start docker with the given CIDR for Pods.
instance.Metadata.Items = append(instance.Metadata.Items,
&compute.MetadataItems{
Key: podCIDRMetadataKey,
Value: spec.PodCIDR,
})
setMetadataCall := gce.service.Instances.SetMetadata(gce.projectID, gce.zone, instanceName, instance.Metadata)
setMetadataOp, err := setMetadataCall.Do()
if err != nil {
return err
}
err = gce.waitForZoneOp(setMetadataOp)
if err != nil {
return err
}
insertCall := gce.service.Routes.Insert(gce.projectID, &compute.Route{
insertOp, err := gce.service.Routes.Insert(gce.projectID, &compute.Route{
Name: instanceName,
DestRange: spec.PodCIDR,
NextHopInstance: fmt.Sprintf("zones/%s/instances/%s", gce.zone, instanceName),
Network: fmt.Sprintf("global/networks/%s", gce.networkName),
Priority: 1000,
})
insertOp, err := insertCall.Do()
}).Do()
if err != nil {
return err
}
return gce.waitForGlobalOp(insertOp)
if err := gce.waitForGlobalOp(insertOp); err != nil {
if gapiErr, ok := err.(*googleapi.Error); ok && gapiErr.Code == http.StatusConflict {
// TODO (cjcullen): Make this actually check the route is correct.
return nil
}
}
return err
}
func (gce *GCECloud) Release(name string) error {

View File

@ -0,0 +1,84 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"bytes"
"net"
"os/exec"
"regexp"
"github.com/golang/glog"
)
var cidrRegexp = regexp.MustCompile(`inet ([0-9a-fA-F.:]*/[0-9]*)`)
func ensureCbr0(wantCIDR *net.IPNet) error {
if !cbr0CidrCorrect(wantCIDR) {
glog.V(2).Infof("Attempting to recreate cbr0 with address range: %s", wantCIDR)
// delete cbr0
if err := exec.Command("ip", "link", "set", "dev", "cbr0", "down").Run(); err != nil {
glog.Error(err)
return err
}
if err := exec.Command("brctl", "delbr", "cbr0").Run(); err != nil {
glog.Error(err)
return err
}
// recreate cbr0 with wantCIDR
if err := exec.Command("brctl", "addbr", "cbr0").Run(); err != nil {
glog.Error(err)
return err
}
if err := exec.Command("ip", "addr", "add", wantCIDR.String(), "dev", "cbr0").Run(); err != nil {
glog.Error(err)
return err
}
if err := exec.Command("ip", "link", "set", "dev", "cbr0", "up").Run(); err != nil {
glog.Error(err)
return err
}
// restart docker
if err := exec.Command("service", "docker", "restart").Run(); err != nil {
glog.Error(err)
// For now just log the error. The containerRuntime check will catch docker failures.
// TODO (dawnchen) figure out what we should do for rkt here.
}
glog.V(2).Info("Recreated cbr0 and restarted docker")
}
return nil
}
func cbr0CidrCorrect(wantCIDR *net.IPNet) bool {
output, err := exec.Command("ip", "addr", "show", "cbr0").Output()
if err != nil {
return false
}
match := cidrRegexp.FindSubmatch(output)
if len(match) < 2 {
return false
}
cbr0IP, cbr0CIDR, err := net.ParseCIDR(string(match[1]))
if err != nil {
glog.Errorf("Couldn't parse CIDR: %q", match[1])
return false
}
cbr0CIDR.IP = cbr0IP
glog.V(5).Infof("Want cbr0 CIDR: %s, have cbr0 CIDR: %s", wantCIDR, cbr0CIDR)
return wantCIDR.IP.Equal(cbr0IP) && bytes.Equal(wantCIDR.Mask, cbr0CIDR.Mask)
}

View File

@ -138,7 +138,8 @@ func NewMainKubelet(
cgroupRoot string,
containerRuntime string,
mounter mount.Interface,
dockerDaemonContainer string) (*Kubelet, error) {
dockerDaemonContainer string,
configureCBR0 bool) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
@ -244,6 +245,7 @@ func NewMainKubelet(
oomWatcher: oomWatcher,
cgroupRoot: cgroupRoot,
mounter: mounter,
configureCBR0: configureCBR0,
}
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
@ -456,6 +458,10 @@ type Kubelet struct {
// Manager of non-Runtime containers.
containerManager containerManager
// Whether or not kubelet should take responsibility for keeping cbr0 in
// the correct state.
configureCBR0 bool
}
// getRootDir returns the full path to the directory under which kubelet can
@ -1588,6 +1594,23 @@ func (kl *Kubelet) updateRuntimeUp() {
}
}
func (kl *Kubelet) reconcileCBR0(podCIDR string) error {
if podCIDR == "" {
glog.V(5).Info("PodCIDR not set. Will not configure cbr0.")
return nil
}
_, cidr, err := net.ParseCIDR(podCIDR)
if err != nil {
return err
}
// Set cbr0 interface address to first address in IPNet
cidr.IP.To4()[3] += 1
if err := ensureCbr0(cidr); err != nil {
return err
}
return nil
}
// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
@ -1622,7 +1645,8 @@ func (kl *Kubelet) recordNodeUnschedulableEvent() {
// Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
var oldNodeUnschedulable bool
// tryUpdateNodeStatus tries to update node status to master.
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.hostname)
if err != nil {
@ -1632,6 +1656,14 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
return fmt.Errorf("no node instance returned for %q", kl.hostname)
}
networkConfigured := true
if kl.configureCBR0 {
if err := kl.reconcileCBR0(node.Spec.PodCIDR); err != nil {
networkConfigured = false
glog.Errorf("Error configuring cbr0: %v", err)
}
}
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetCachedMachineInfo()
@ -1673,18 +1705,25 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
currentTime := util.Now()
var newCondition api.NodeCondition
if containerRuntimeUp {
if containerRuntimeUp && networkConfigured {
newCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: fmt.Sprintf("kubelet is posting ready status"),
Reason: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
} else {
var reasons []string
if !containerRuntimeUp {
reasons = append(reasons, "container runtime is down")
}
if !networkConfigured {
reasons = append(reasons, "network not configured correctly")
}
newCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: fmt.Sprintf("container runtime is down"),
Reason: strings.Join(reasons, ","),
LastHeartbeatTime: currentTime,
}
}