Merge pull request #18795 from dcbw/cbr0-network-plugin

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot
2016-02-03 19:46:25 -08:00
23 changed files with 542 additions and 21 deletions

View File

@@ -1654,7 +1654,7 @@ func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubecontainer.Do
netNamespace := ""
var ports []api.ContainerPort
if dm.networkPlugin.Name() == "cni" {
if dm.networkPlugin.Name() == "cni" || dm.networkPlugin.Name() == "kubenet" {
netNamespace = "none"
}

View File

@@ -269,6 +269,13 @@ func NewMainKubelet(
oomWatcher := NewOOMWatcher(cadvisorInterface, recorder)
// TODO: remove when internal cbr0 implementation gets removed in favor
// of the kubenet network plugin
if networkPluginName == "kubenet" {
configureCBR0 = false
flannelExperimentalOverlay = false
}
klet := &Kubelet{
hostname: hostname,
nodeName: nodeName,
@@ -398,7 +405,8 @@ func NewMainKubelet(
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache)
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, podCIDR, klet.isContainerRuntimeVersionCompatible)
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, klet.isContainerRuntimeVersionCompatible)
klet.updatePodCIDR(podCIDR)
// setup containerGC
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
@@ -637,6 +645,7 @@ type Kubelet struct {
resolverConfig string
// Optionally shape the bandwidth of a pod
// TODO: remove when kubenet plugin is ready
shaper bandwidth.BandwidthShaper
// True if container cpu limits should be enforced via cgroup CFS quota
@@ -2594,6 +2603,8 @@ func (kl *Kubelet) updateRuntimeUp() {
kl.runtimeState.setRuntimeSync(kl.clock.Now())
}
// TODO: remove when kubenet plugin is ready
// NOTE!!! if you make changes here, also make them to kubenet
func (kl *Kubelet) reconcileCBR0(podCIDR string) error {
if podCIDR == "" {
glog.V(5).Info("PodCIDR not set. Will not configure cbr0.")
@@ -2644,9 +2655,7 @@ func (kl *Kubelet) syncNetworkStatus() {
glog.Infof("Flannel server handshake failed %v", err)
return
}
glog.Infof("Setting cidr: %v -> %v",
kl.runtimeState.podCIDR(), podCIDR)
kl.runtimeState.setPodCIDR(podCIDR)
kl.updatePodCIDR(podCIDR)
}
if err := ensureIPTablesMasqRule(kl.nonMasqueradeCIDR); err != nil {
err = fmt.Errorf("Error on adding ip table rules: %v", err)
@@ -3026,7 +3035,7 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
}
}
} else if kl.reconcileCIDR {
kl.runtimeState.setPodCIDR(node.Spec.PodCIDR)
kl.updatePodCIDR(node.Spec.PodCIDR)
}
if err := kl.setNodeStatus(node); err != nil {
@@ -3445,6 +3454,21 @@ func (kl *Kubelet) GetRuntime() kubecontainer.Runtime {
return kl.containerRuntime
}
func (kl *Kubelet) updatePodCIDR(cidr string) {
if kl.runtimeState.podCIDR() == cidr {
return
}
glog.Infof("Setting Pod CIDR: %v -> %v", kl.runtimeState.podCIDR(), cidr)
kl.runtimeState.setPodCIDR(cidr)
if kl.networkPlugin != nil {
details := make(map[string]interface{})
details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = cidr
kl.networkPlugin.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details)
}
}
var minRsrc = resource.MustParse("1k")
var maxRsrc = resource.MustParse("1P")

View File

@@ -117,7 +117,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.hostname = testKubeletHostname
kubelet.nodeName = testKubeletHostname
kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false, "" /* Pod CIDR */, func() error { return nil })
kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false, func() error { return nil })
kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil {
t.Fatalf("can't make a temp rootdir: %v", err)
@@ -2961,7 +2961,7 @@ func TestDockerRuntimeVersion(t *testing.T) {
},
}
kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false, "", kubelet.isContainerRuntimeVersionCompatible)
kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, false, kubelet.isContainerRuntimeVersionCompatible)
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
@@ -3424,7 +3424,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
},
},
}
kubelet.runtimeState = newRuntimeState(time.Duration(0), false, "" /* Pod CIDR */, func() error { return nil })
kubelet.runtimeState = newRuntimeState(time.Duration(0), false, func() error { return nil })
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)

View File

@@ -97,6 +97,9 @@ func (plugin *cniNetworkPlugin) Init(host network.Host) error {
return nil
}
func (plugin *cniNetworkPlugin) Event(name string, details map[string]interface{}) {
}
func (plugin *cniNetworkPlugin) Name() string {
return CNIPluginName
}

View File

@@ -120,6 +120,9 @@ func (plugin *execNetworkPlugin) getExecutable() string {
return path.Join(plugin.execPath, execName)
}
func (plugin *execNetworkPlugin) Event(name string, details map[string]interface{}) {
}
func (plugin *execNetworkPlugin) Name() string {
return plugin.execName
}

View File

@@ -0,0 +1,287 @@
// +build linux
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubenet
import (
"fmt"
"net"
"strings"
"syscall"
"github.com/vishvananda/netlink"
"github.com/appc/cni/libcni"
"github.com/golang/glog"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/util/bandwidth"
)
const (
KubenetPluginName = "kubenet"
BridgeName = "cbr0"
DefaultCNIDir = "/opt/cni/bin"
DefaultInterfaceName = "eth0"
)
type kubenetNetworkPlugin struct {
host network.Host
netConfig *libcni.NetworkConfig
cniConfig *libcni.CNIConfig
shaper bandwidth.BandwidthShaper
podCIDRs map[kubecontainer.DockerID]string
MTU int
}
func NewPlugin() network.NetworkPlugin {
return &kubenetNetworkPlugin{
podCIDRs: make(map[kubecontainer.DockerID]string),
MTU: 1460,
}
}
func (plugin *kubenetNetworkPlugin) Init(host network.Host) error {
plugin.host = host
plugin.cniConfig = &libcni.CNIConfig{
Path: []string{DefaultCNIDir},
}
if link, err := findMinMTU(); err == nil {
plugin.MTU = link.MTU
glog.V(5).Infof("Using interface %s MTU %d as bridge MTU", link.Name, link.MTU)
} else {
glog.Warningf("Failed to find default bridge MTU: %v", err)
}
return nil
}
func findMinMTU() (*net.Interface, error) {
intfs, err := net.Interfaces()
if err != nil {
return nil, err
}
mtu := 999999
defIntfIndex := -1
for i, intf := range intfs {
if ((intf.Flags & net.FlagUp) != 0) && (intf.Flags&(net.FlagLoopback|net.FlagPointToPoint) == 0) {
if intf.MTU < mtu {
mtu = intf.MTU
defIntfIndex = i
}
}
}
if mtu >= 999999 || mtu < 576 || defIntfIndex < 0 {
return nil, fmt.Errorf("no suitable interface", BridgeName)
}
return &intfs[defIntfIndex], nil
}
const NET_CONFIG_TEMPLATE = `{
"cniVersion": "0.1.0",
"name": "kubenet",
"type": "bridge",
"bridge": "%s",
"mtu": %d,
"addIf": "%s",
"isGateway": true,
"ipMasq": true,
"ipam": {
"type": "host-local",
"subnet": "%s",
"gateway": "%s",
"routes": [
{ "dst": "0.0.0.0/0" }
]
}
}`
func (plugin *kubenetNetworkPlugin) Event(name string, details map[string]interface{}) {
if name != network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE {
return
}
podCIDR, ok := details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR].(string)
if !ok {
glog.Warningf("%s event didn't contain pod CIDR", network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE)
return
}
if plugin.netConfig != nil {
glog.V(5).Infof("Ignoring subsequent pod CIDR update to %s", podCIDR)
return
}
glog.V(5).Infof("PodCIDR is set to %q", podCIDR)
_, cidr, err := net.ParseCIDR(podCIDR)
if err == nil {
// Set bridge address to first address in IPNet
cidr.IP.To4()[3] += 1
json := fmt.Sprintf(NET_CONFIG_TEMPLATE, BridgeName, plugin.MTU, DefaultInterfaceName, podCIDR, cidr.IP.String())
plugin.netConfig, err = libcni.ConfFromBytes([]byte(json))
if err == nil {
glog.V(5).Infof("CNI network config:\n%s", json)
// Ensure cbr0 has no conflicting addresses; CNI's 'bridge'
// plugin will bail out if the bridge has an unexpected one
plugin.clearBridgeAddressesExcept(cidr.IP.String())
}
}
if err != nil {
glog.Warningf("Failed to generate CNI network config: %v", err)
}
}
func (plugin *kubenetNetworkPlugin) clearBridgeAddressesExcept(keep string) {
bridge, err := netlink.LinkByName(BridgeName)
if err != nil {
return
}
addrs, err := netlink.AddrList(bridge, syscall.AF_INET)
if err != nil {
return
}
for _, addr := range addrs {
if addr.IPNet.String() != keep {
glog.V(5).Infof("Removing old address %s from %s", addr.IPNet.String(), BridgeName)
netlink.AddrDel(bridge, &addr)
}
}
}
func (plugin *kubenetNetworkPlugin) Name() string {
return KubenetPluginName
}
func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.DockerID) error {
// Can't set up pods if we don't have a PodCIDR yet
if plugin.netConfig == nil {
return fmt.Errorf("Kubenet needs a PodCIDR to set up pods")
}
runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager)
if !ok {
return fmt.Errorf("Kubenet execution called on non-docker runtime")
}
netnsPath, err := runtime.GetNetNS(id.ContainerID())
if err != nil {
return err
}
rt := buildCNIRuntimeConf(name, namespace, id.ContainerID(), netnsPath)
if err != nil {
return fmt.Errorf("Error building CNI config: %v", err)
}
res, err := plugin.cniConfig.AddNetwork(plugin.netConfig, rt)
if err != nil {
return fmt.Errorf("Error adding container to network: %v", err)
}
if res.IP4 == nil {
return fmt.Errorf("CNI plugin reported no IPv4 address for container %v.", id)
}
plugin.podCIDRs[id] = res.IP4.IP.String()
// The first SetUpPod call creates the bridge; ensure shaping is enabled
if plugin.shaper == nil {
plugin.shaper = bandwidth.NewTCShaper(BridgeName)
if plugin.shaper == nil {
return fmt.Errorf("Failed to create bandwidth shaper!")
}
plugin.shaper.ReconcileInterface()
}
// TODO: get ingress/egress from Pod.Spec and add pod CIDR to shaper
return nil
}
func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.DockerID) error {
if plugin.netConfig == nil {
return fmt.Errorf("Kubenet needs a PodCIDR to tear down pods")
}
runtime, ok := plugin.host.GetRuntime().(*dockertools.DockerManager)
if !ok {
return fmt.Errorf("Kubenet execution called on non-docker runtime")
}
netnsPath, err := runtime.GetNetNS(id.ContainerID())
if err != nil {
return err
}
rt := buildCNIRuntimeConf(name, namespace, id.ContainerID(), netnsPath)
if err != nil {
return fmt.Errorf("Error building CNI config: %v", err)
}
// no cached CIDR is Ok during teardown
if cidr, ok := plugin.podCIDRs[id]; ok {
glog.V(5).Infof("Removing pod CIDR %s from shaper", cidr)
// shaper wants /32
if addr, _, err := net.ParseCIDR(cidr); err != nil {
if err = plugin.shaper.Reset(fmt.Sprintf("%s/32", addr.String())); err != nil {
glog.Warningf("Failed to remove pod CIDR %s from shaper: %v", cidr, err)
}
}
}
delete(plugin.podCIDRs, id)
if err := plugin.cniConfig.DelNetwork(plugin.netConfig, rt); err != nil {
return fmt.Errorf("Error removing container from network: %v", err)
}
return nil
}
// TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin.
// Also fix the runtime's call to Status function to be done only in the case that the IP is lost, no need to do periodic calls
func (plugin *kubenetNetworkPlugin) Status(namespace string, name string, id kubecontainer.DockerID) (*network.PodNetworkStatus, error) {
cidr, ok := plugin.podCIDRs[id]
if !ok {
return nil, fmt.Errorf("No IP address found for pod %v", id)
}
ip, _, err := net.ParseCIDR(strings.Trim(cidr, "\n"))
if err != nil {
return nil, err
}
return &network.PodNetworkStatus{IP: ip}, nil
}
func buildCNIRuntimeConf(podName string, podNs string, podInfraContainerID kubecontainer.ContainerID, podNetnsPath string) *libcni.RuntimeConf {
glog.V(4).Infof("Kubenet: using netns path %v", podNetnsPath)
glog.V(4).Infof("Kubenet: using podns path %v", podNs)
return &libcni.RuntimeConf{
ContainerID: podInfraContainerID.ID,
NetNS: podNetnsPath,
IfName: DefaultInterfaceName,
}
}

View File

@@ -0,0 +1,55 @@
// +build !linux
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubenet
import (
"fmt"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
)
type kubenetNetworkPlugin struct {
}
func NewPlugin() network.NetworkPlugin {
return &kubenetNetworkPlugin{}
}
func (plugin *kubenetNetworkPlugin) Init(host network.Host) error {
return fmt.Errorf("Kubenet is not supported in this build")
}
func (plugin *kubenetNetworkPlugin) Event(name string, details map[string]interface{}) {
}
func (plugin *kubenetNetworkPlugin) Name() string {
return "kubenet"
}
func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.DockerID) error {
return fmt.Errorf("Kubenet is not supported in this build")
}
func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.DockerID) error {
return fmt.Errorf("Kubenet is not supported in this build")
}
func (plugin *kubenetNetworkPlugin) Status(namespace string, name string, id kubecontainer.DockerID) (*network.PodNetworkStatus, error) {
return nil, fmt.Errorf("Kubenet is not supported in this build")
}

View File

@@ -33,12 +33,21 @@ import (
const DefaultPluginName = "kubernetes.io/no-op"
// Called when the node's Pod CIDR is known when using the
// controller manager's --allocate-node-cidrs=true option
const NET_PLUGIN_EVENT_POD_CIDR_CHANGE = "pod-cidr-change"
const NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR = "pod-cidr"
// Plugin is an interface to network plugins for the kubelet
type NetworkPlugin interface {
// Init initializes the plugin. This will be called exactly once
// before any other methods are called.
Init(host Host) error
// Called on various events like:
// NET_PLUGIN_EVENT_POD_CIDR_CHANGE
Event(name string, details map[string]interface{})
// Name returns the plugin's name. This will be used when searching
// for a plugin by name, e.g.
Name() string
@@ -130,6 +139,9 @@ func (plugin *noopNetworkPlugin) Init(host Host) error {
return nil
}
func (plugin *noopNetworkPlugin) Event(name string, details map[string]interface{}) {
}
func (plugin *noopNetworkPlugin) Name() string {
return DefaultPluginName
}

View File

@@ -94,7 +94,6 @@ func (s *runtimeState) errors() []string {
func newRuntimeState(
runtimeSyncThreshold time.Duration,
configureNetwork bool,
cidr string,
runtimeCompatibility func() error,
) *runtimeState {
var networkError error = nil
@@ -105,7 +104,6 @@ func newRuntimeState(
lastBaseRuntimeSync: time.Time{},
baseRuntimeSyncThreshold: runtimeSyncThreshold,
networkError: networkError,
cidr: cidr,
internalError: nil,
runtimeCompatibility: runtimeCompatibility,
}