Flannel handshakes with kubelet.

This commit is contained in:
Prashanth Balasubramanian 2015-11-20 19:41:32 -08:00 committed by gmarek
parent 4cd1ee177b
commit 7aa8ebe30f
5 changed files with 197 additions and 10 deletions

View File

@ -4,16 +4,16 @@ flannel-tar:
- user: root
- name: /usr/local/src
- makedirs: True
- source: https://github.com/coreos/flannel/releases/download/v0.5.3/flannel-0.5.3-linux-amd64.tar.gz
- source: https://github.com/coreos/flannel/releases/download/v0.5.5/flannel-0.5.5-linux-amd64.tar.gz
- tar_options: v
- source_hash: md5=2a82ed82a37d71c85586977f0e475b70
- source_hash: md5=972c717254775bef528f040af804f2cc
- archive_format: tar
- if_missing: /usr/local/src/flannel/flannel-0.5.3/
- if_missing: /usr/local/src/flannel/flannel-0.5.5/
flannel-symlink:
file.symlink:
- name: /usr/local/bin/flanneld
- target: /usr/local/src/flannel-0.5.3/flanneld
- target: /usr/local/src/flannel-0.5.5/flanneld
- force: true
- watch:
- archive: flannel-tar

View File

@ -39,7 +39,7 @@
{% set root_ca_file = "--root-ca-file=/srv/kubernetes/ca.crt" -%}
{% endif -%}
{% set params = "--master=127.0.0.1:8080" + " " + cluster_name + " " + cluster_cidr + " " + allocate_node_cidrs + " " + terminated_pod_gc + " " + cloud_provider + " " + cloud_config + service_account_key + pillar['log_level'] + " " + root_ca_file -%}
{% set params = "--master=127.0.0.1:8080" + " " + cluster_name + " " + cluster_cidr + " --allocate-node-cidrs=false" + " " + terminated_pod_gc + " " + cloud_provider + " " + cloud_config + service_account_key + pillar['log_level'] + " " + root_ca_file -%}
# test_args has to be kept at the end, so they'll overwrite any prior configuration

View File

@ -67,7 +67,11 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
)
const defaultRootDir = "/var/lib/kubelet"
const (
defaultRootDir = "/var/lib/kubelet"
networkConfig = "/var/run/flannel/network.json"
useDefaultOverlay = true
)
// KubeletServer encapsulates all of the parameters necessary for starting up
// a kubelet. These can either be set via command line or directly.
@ -155,6 +159,10 @@ type KubeletServer struct {
// Pull images one at a time.
SerializeImagePulls bool
// Flannel config parameters
UseDefaultOverlay bool
NetworkConfig string
}
// bootstrapping interface for kubelet, targets the initialization protocol
@ -227,6 +235,9 @@ func NewKubeletServer() *KubeletServer {
ReconcileCIDR: true,
KubeAPIQPS: 5.0,
KubeAPIBurst: 10,
// Flannel parameters
UseDefaultOverlay: useDefaultOverlay,
// NetworkConfig: networkConfig,
}
}
@ -341,6 +352,10 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
fs.BoolVar(&s.SerializeImagePulls, "serialize-image-pulls", s.SerializeImagePulls, "Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true]")
// Flannel config parameters
fs.BoolVar(&s.UseDefaultOverlay, "use-default-overlay", s.UseDefaultOverlay, "Experimental support for starting the kubelet with the default overlay network (flannel). Assumes flanneld is already running in client mode. [default=false]")
fs.StringVar(&s.NetworkConfig, "network-config", s.NetworkConfig, "Absolute path to a network json file, as accepted by flannel.")
}
// UnsecuredKubeletConfig returns a KubeletConfig suitable for being run, or an error if the server setup
@ -478,6 +493,10 @@ func (s *KubeletServer) UnsecuredKubeletConfig() (*KubeletConfig, error) {
TLSOptions: tlsOptions,
Writer: writer,
VolumePlugins: ProbeVolumePlugins(),
// Flannel options
UseDefaultOverlay: s.UseDefaultOverlay,
NetworkConfig: s.NetworkConfig,
}, nil
}
@ -949,6 +968,10 @@ type KubeletConfig struct {
TLSOptions *kubelet.TLSOptions
Writer io.Writer
VolumePlugins []volume.VolumePlugin
// Flannel parameters
UseDefaultOverlay bool
NetworkConfig string
}
func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {
@ -1031,6 +1054,9 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.OOMAdjuster,
kc.SerializeImagePulls,
kc.ContainerManager,
// Flannel parameters
kc.UseDefaultOverlay,
//kc.NetworkConfig,
)
if err != nil {

View File

@ -0,0 +1,132 @@
package kubelet
import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"strconv"
"strings"
"github.com/golang/glog"
)
const (
networkType = "vxlan"
dockerOptsFile = "/etc/default/docker"
flannelSubnetKey = "FLANNEL_SUBNET"
flannelNetworkKey = "FLANNEL_NETWORK"
flannelMtuKey = "FLANNEL_MTU"
dockerOptsKey = "DOCKER_OPTS"
flannelSubnetFile = "/var/run/flannel/subnet.env"
)
type FlannelServer struct {
subnetFile string
// TODO: Manage subnet file.
}
func NewFlannelServer() *FlannelServer {
return &FlannelServer{flannelSubnetFile}
}
func (f *FlannelServer) Handshake() (podCIDR string, err error) {
// Flannel daemon will hang till the server comes up, kubelet will hang until
// flannel daemon has written subnet env variables. This is the kubelet handshake.
// To improve performance, we could defer just the configuration of the container
// bridge till after subnet.env is written. Keeping it local is clearer for now.
// TODO: Using a file to communicate is brittle
if _, err = os.Stat(f.subnetFile); err != nil {
return "", fmt.Errorf("Waiting for subnet file %v", f.subnetFile)
}
glog.Infof("(kubelet)Found flannel subnet file %v", f.subnetFile)
// TODO: Rest of this function is a hack.
config, err := parseKVConfig(f.subnetFile)
if err != nil {
return "", err
}
if err = writeDockerOptsFromFlannelConfig(config); err != nil {
return "", err
}
podCIDR, ok := config[flannelSubnetKey]
if !ok {
return "", fmt.Errorf("No flannel subnet, config %+v", config)
}
kubeNetwork, ok := config[flannelNetworkKey]
if !ok {
return "", fmt.Errorf("No flannel network, config %+v", config)
}
if err := exec.Command("iptables",
"-t", "nat",
"-A", "POSTROUTING",
"!", "-d", kubeNetwork,
"-s", podCIDR,
"-j", "MASQUERADE").Run(); err != nil {
return "", fmt.Errorf("Unable to install iptables rule for flannel.")
}
return podCIDR, nil
}
// Take env variables from flannel subnet env and write to /etc/docker/defaults.
func writeDockerOptsFromFlannelConfig(flannelConfig map[string]string) error {
// TODO: Write dockeropts to unit file on systemd machines
// https://github.com/docker/docker/issues/9889
mtu, ok := flannelConfig[flannelMtuKey]
if !ok {
return fmt.Errorf("No flannel mtu, flannel config %+v", flannelConfig)
}
dockerOpts, err := parseKVConfig(dockerOptsFile)
if err != nil {
return err
}
opts, ok := dockerOpts[dockerOptsKey]
if !ok {
glog.Errorf("(kubelet)Did not find docker opts, writing them")
opts = fmt.Sprintf(
" --bridge=cbr0 --iptables=false --ip-masq=false")
} else {
opts, _ = strconv.Unquote(opts)
}
dockerOpts[dockerOptsKey] = fmt.Sprintf("\"%v --mtu=%v\"", opts, mtu)
if err = writeKVConfig(dockerOptsFile, dockerOpts); err != nil {
return err
}
return nil
}
// parseKVConfig takes a file with key-value env variables and returns a dictionary mapping the same.
func parseKVConfig(filename string) (map[string]string, error) {
config := map[string]string{}
if _, err := os.Stat(filename); err != nil {
return config, err
}
buff, err := ioutil.ReadFile(filename)
if err != nil {
return config, err
}
str := string(buff)
glog.Infof("(kubelet) Read kv options %+v from %v", str, filename)
for _, line := range strings.Split(str, "\n") {
kv := strings.Split(line, "=")
if len(kv) != 2 {
glog.Warningf("Ignoring non key-value pair %v", kv)
continue
}
config[string(kv[0])] = string(kv[1])
}
return config, nil
}
// writeKVConfig writes a kv map as env variables into the given file.
func writeKVConfig(filename string, kv map[string]string) error {
if _, err := os.Stat(filename); err != nil {
return err
}
content := ""
for k, v := range kv {
content += fmt.Sprintf("%v=%v\n", k, v)
}
glog.Warningf("(kubelet)Writing kv options %+v to %v", content, filename)
return ioutil.WriteFile(filename, []byte(content), 0644)
}

View File

@ -217,6 +217,7 @@ func NewMainKubelet(
oomAdjuster *oom.OOMAdjuster,
serializeImagePulls bool,
containerManager cm.ContainerManager,
useDefaultOverlay bool,
) (*Kubelet, error) {
if rootDirectory == "" {
@ -327,8 +328,16 @@ func NewMainKubelet(
cpuCFSQuota: cpuCFSQuota,
daemonEndpoints: daemonEndpoints,
containerManager: containerManager,
}
// Flannel options
// TODO: This is currently a dummy server.
flannelServer: NewFlannelServer(),
useDefaultOverlay: useDefaultOverlay,
}
if klet.kubeClient == nil {
glog.Infof("Master not setting up flannel overlay")
klet.useDefaultOverlay = false
}
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
return nil, err
} else {
@ -649,6 +658,10 @@ type Kubelet struct {
// oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
oneTimeInitializer sync.Once
// Flannel options.
useDefaultOverlay bool
flannelServer *FlannelServer
}
func (kl *Kubelet) allSourcesReady() bool {
@ -1116,6 +1129,7 @@ func (kl *Kubelet) syncNodeStatus() {
}
if kl.registerNode {
// This will exit immediately if it doesn't need to do anything.
glog.Infof("(kubelet) registering node with apiserver")
kl.registerWithApiserver()
}
if err := kl.updateNodeStatus(); err != nil {
@ -2574,10 +2588,10 @@ func (kl *Kubelet) updateRuntimeUp() {
func (kl *Kubelet) reconcileCBR0(podCIDR string) error {
if podCIDR == "" {
glog.V(5).Info("PodCIDR not set. Will not configure cbr0.")
glog.V(1).Info("(kubelet) PodCIDR not set. Will not configure cbr0.")
return nil
}
glog.V(5).Infof("PodCIDR is set to %q", podCIDR)
glog.V(1).Infof("(kubelet) PodCIDR is set to %q", podCIDR)
_, cidr, err := net.ParseCIDR(podCIDR)
if err != nil {
return err
@ -2619,6 +2633,17 @@ var oldNodeUnschedulable bool
func (kl *Kubelet) syncNetworkStatus() {
var err error
if kl.configureCBR0 {
if kl.useDefaultOverlay {
glog.Infof("(kubelet) handshaking")
podCIDR, err := kl.flannelServer.Handshake()
if err != nil {
glog.Infof("Flannel server handshake failed %v", err)
return
}
glog.Infof("(kubelet) setting cidr, currently: %v -> %v",
kl.runtimeState.podCIDR(), podCIDR)
kl.runtimeState.setPodCIDR(podCIDR)
}
if err := ensureIPTablesMasqRule(); err != nil {
err = fmt.Errorf("Error on adding ip table rules: %v", err)
glog.Error(err)
@ -2884,9 +2909,13 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
if node == nil {
return fmt.Errorf("no node instance returned for %q", kl.nodeName)
}
if kl.reconcileCIDR {
// TODO: Actually update the node spec with pod cidr, this is currently a no-op.
if kl.useDefaultOverlay {
node.Spec.PodCIDR = kl.runtimeState.podCIDR()
} else if kl.reconcileCIDR {
kl.runtimeState.setPodCIDR(node.Spec.PodCIDR)
}
glog.Infof("(kubelet) updating node in apiserver with cidr %v", node.Spec.PodCIDR)
if err := kl.setNodeStatus(node); err != nil {
return err