From dba434c4ba0bf39275f25b3833fcc5f6eefa9379 Mon Sep 17 00:00:00 2001 From: "Khaled Henidak(Kal)" Date: Tue, 2 Jul 2019 15:45:42 +0000 Subject: [PATCH] kubenet for ipv6 dualstack --- pkg/kubelet/config/common_test.go | 16 + pkg/kubelet/container/runtime.go | 4 +- pkg/kubelet/dockershim/docker_sandbox.go | 66 ++- pkg/kubelet/dockershim/docker_sandbox_test.go | 6 +- pkg/kubelet/dockershim/helpers_linux.go | 4 +- pkg/kubelet/dockershim/helpers_unsupported.go | 4 +- pkg/kubelet/dockershim/helpers_windows.go | 16 +- pkg/kubelet/dockershim/network/BUILD | 2 + .../dockershim/network/cni/cni_others.go | 7 +- pkg/kubelet/dockershim/network/kubenet/BUILD | 4 + .../network/kubenet/kubenet_linux.go | 471 +++++++++++++----- .../network/kubenet/kubenet_linux_test.go | 110 ++-- pkg/kubelet/dockershim/network/plugins.go | 43 +- pkg/kubelet/kubelet.go | 14 +- pkg/kubelet/kubelet_node_status.go | 11 +- pkg/kubelet/kubelet_node_status_test.go | 11 +- pkg/kubelet/kubelet_pods.go | 11 +- .../kuberuntime/kuberuntime_manager.go | 28 +- .../kuberuntime/kuberuntime_manager_test.go | 17 +- .../kuberuntime/kuberuntime_sandbox.go | 38 +- pkg/kubelet/lifecycle/handlers.go | 4 +- pkg/kubelet/pleg/generic.go | 20 +- pkg/kubelet/pleg/generic_test.go | 117 +++-- pkg/util/node/node.go | 4 +- .../pkg/apis/testing/fake_runtime_service.go | 16 +- 25 files changed, 718 insertions(+), 326 deletions(-) diff --git a/pkg/kubelet/config/common_test.go b/pkg/kubelet/config/common_test.go index 709fdcf3315..2328a29698f 100644 --- a/pkg/kubelet/config/common_test.go +++ b/pkg/kubelet/config/common_test.go @@ -63,6 +63,14 @@ func TestDecodeSinglePod(t *testing.T) { SchedulerName: core.DefaultSchedulerName, EnableServiceLinks: &enableServiceLinks, }, + Status: v1.PodStatus{ + PodIP: "1.2.3.4", + PodIPs: []v1.PodIP{ + { + IP: "1.2.3.4", + }, + }, + }, } json, err := runtime.Encode(testapi.Default.Codec(), pod) if err != nil { @@ -128,6 +136,14 @@ func TestDecodePodList(t *testing.T) { SchedulerName: core.DefaultSchedulerName, EnableServiceLinks: &enableServiceLinks, }, + Status: v1.PodStatus{ + PodIP: "1.2.3.4", + PodIPs: []v1.PodIP{ + { + IP: "1.2.3.4", + }, + }, + }, } podList := &v1.PodList{ Items: []v1.Pod{*pod}, diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 4eb7c3cbea9..9699a5b5cfe 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -273,8 +273,8 @@ type PodStatus struct { Name string // Namespace of the pod. Namespace string - // IP of the pod. - IP string + // All IPs assigned to this pod + IPs []string // Status of containers in the pod. ContainerStatuses []*ContainerStatus // Status of the pod sandbox. diff --git a/pkg/kubelet/dockershim/docker_sandbox.go b/pkg/kubelet/dockershim/docker_sandbox.go index 5d721148bb5..1824cdbadbf 100644 --- a/pkg/kubelet/dockershim/docker_sandbox.go +++ b/pkg/kubelet/dockershim/docker_sandbox.go @@ -324,65 +324,75 @@ func (ds *dockerService) RemovePodSandbox(ctx context.Context, r *runtimeapi.Rem return nil, utilerrors.NewAggregate(errs) } -// getIPFromPlugin interrogates the network plugin for an IP. -func (ds *dockerService) getIPFromPlugin(sandbox *dockertypes.ContainerJSON) (string, error) { +// getIPsFromPlugin interrogates the network plugin for sandbox IPs. +func (ds *dockerService) getIPsFromPlugin(sandbox *dockertypes.ContainerJSON) ([]string, error) { metadata, err := parseSandboxName(sandbox.Name) if err != nil { - return "", err + return nil, err } msg := fmt.Sprintf("Couldn't find network status for %s/%s through plugin", metadata.Namespace, metadata.Name) cID := kubecontainer.BuildContainerID(runtimeName, sandbox.ID) networkStatus, err := ds.network.GetPodNetworkStatus(metadata.Namespace, metadata.Name, cID) if err != nil { - return "", err + return nil, err } if networkStatus == nil { - return "", fmt.Errorf("%v: invalid network status for", msg) + return nil, fmt.Errorf("%v: invalid network status for", msg) } - return networkStatus.IP.String(), nil + + ips := make([]string, 0) + for _, ip := range networkStatus.IPs { + ips = append(ips, ip.String()) + } + // if we don't have any ip in our list then cni is using classic primary IP only + if len(ips) == 0 { + ips = append(ips, networkStatus.IP.String()) + } + return ips, nil } -// getIP returns the ip given the output of `docker inspect` on a pod sandbox, +// getIPs returns the ip given the output of `docker inspect` on a pod sandbox, // first interrogating any registered plugins, then simply trusting the ip // in the sandbox itself. We look for an ipv4 address before ipv6. -func (ds *dockerService) getIP(podSandboxID string, sandbox *dockertypes.ContainerJSON) string { +func (ds *dockerService) getIPs(podSandboxID string, sandbox *dockertypes.ContainerJSON) []string { if sandbox.NetworkSettings == nil { - return "" + return nil } if networkNamespaceMode(sandbox) == runtimeapi.NamespaceMode_NODE { // For sandboxes using host network, the shim is not responsible for // reporting the IP. - return "" + return nil } // Don't bother getting IP if the pod is known and networking isn't ready ready, ok := ds.getNetworkReady(podSandboxID) if ok && !ready { - return "" + return nil } - ip, err := ds.getIPFromPlugin(sandbox) + ips, err := ds.getIPsFromPlugin(sandbox) if err == nil { - return ip + return ips } + ips = make([]string, 0) // TODO: trusting the docker ip is not a great idea. However docker uses // eth0 by default and so does CNI, so if we find a docker IP here, we // conclude that the plugin must have failed setup, or forgotten its ip. // This is not a sensible assumption for plugins across the board, but if // a plugin doesn't want this behavior, it can throw an error. if sandbox.NetworkSettings.IPAddress != "" { - return sandbox.NetworkSettings.IPAddress + ips = append(ips, sandbox.NetworkSettings.IPAddress) } if sandbox.NetworkSettings.GlobalIPv6Address != "" { - return sandbox.NetworkSettings.GlobalIPv6Address + ips = append(ips, sandbox.NetworkSettings.GlobalIPv6Address) } // If all else fails, warn but don't return an error, as pod status // should generally not return anything except fatal errors // FIXME: handle network errors by restarting the pod somehow? klog.Warningf("failed to read pod IP from plugin/docker: %v", err) - return "" + return ips } // Returns the inspect container response, the sandbox metadata, and network namespace mode @@ -422,11 +432,19 @@ func (ds *dockerService) PodSandboxStatus(ctx context.Context, req *runtimeapi.P state = runtimeapi.PodSandboxState_SANDBOX_READY } - var IP string + var ips []string // TODO: Remove this when sandbox is available on windows // This is a workaround for windows, where sandbox is not in use, and pod IP is determined through containers belonging to the Pod. - if IP = ds.determinePodIPBySandboxID(podSandboxID); IP == "" { - IP = ds.getIP(podSandboxID, r) + if ips = ds.determinePodIPBySandboxID(podSandboxID); len(ips) == 0 { + ips = ds.getIPs(podSandboxID, r) + } + + // ip is primary ips + // ips is all other ips + ip := "" + if len(ips) != 0 { + ip = ips[0] + ips = ips[1:] } labels, annotations := extractLabels(r.Config.Labels) @@ -438,7 +456,7 @@ func (ds *dockerService) PodSandboxStatus(ctx context.Context, req *runtimeapi.P Labels: labels, Annotations: annotations, Network: &runtimeapi.PodSandboxNetworkStatus{ - Ip: IP, + Ip: ip, }, Linux: &runtimeapi.LinuxPodSandboxStatus{ Namespaces: &runtimeapi.Namespace{ @@ -450,6 +468,14 @@ func (ds *dockerService) PodSandboxStatus(ctx context.Context, req *runtimeapi.P }, }, } + // add additional IPs + additionalPodIPs := make([]*runtimeapi.PodIP, 0, len(ips)) + for _, ip := range ips { + additionalPodIPs = append(additionalPodIPs, &runtimeapi.PodIP{ + Ip: ip, + }) + } + status.Network.AdditionalIps = additionalPodIPs return &runtimeapi.PodSandboxStatusResponse{Status: status}, nil } diff --git a/pkg/kubelet/dockershim/docker_sandbox_test.go b/pkg/kubelet/dockershim/docker_sandbox_test.go index 687fac36b07..b34c933746d 100644 --- a/pkg/kubelet/dockershim/docker_sandbox_test.go +++ b/pkg/kubelet/dockershim/docker_sandbox_test.go @@ -106,7 +106,7 @@ func TestSandboxStatus(t *testing.T) { State: state, CreatedAt: ct, Metadata: config.Metadata, - Network: &runtimeapi.PodSandboxNetworkStatus{Ip: podIP}, + Network: &runtimeapi.PodSandboxNetworkStatus{Ip: podIP, AdditionalIps: []*runtimeapi.PodIP{}}, Linux: &runtimeapi.LinuxPodSandboxStatus{ Namespaces: &runtimeapi.Namespace{ Options: &runtimeapi.NamespaceOption{ @@ -142,6 +142,7 @@ func TestSandboxStatus(t *testing.T) { require.NoError(t, err) // IP not valid after sandbox stop expected.Network.Ip = "" + expected.Network.AdditionalIps = []*runtimeapi.PodIP{} statusResp, err = ds.PodSandboxStatus(getTestCTX(), &runtimeapi.PodSandboxStatusRequest{PodSandboxId: id}) require.NoError(t, err) assert.Equal(t, expected, statusResp.Status) @@ -161,14 +162,13 @@ func TestSandboxStatusAfterRestart(t *testing.T) { config := makeSandboxConfig("foo", "bar", "1", 0) r := rand.New(rand.NewSource(0)).Uint32() podIP := fmt.Sprintf("10.%d.%d.%d", byte(r>>16), byte(r>>8), byte(r)) - state := runtimeapi.PodSandboxState_SANDBOX_READY ct := int64(0) expected := &runtimeapi.PodSandboxStatus{ State: state, CreatedAt: ct, Metadata: config.Metadata, - Network: &runtimeapi.PodSandboxNetworkStatus{Ip: podIP}, + Network: &runtimeapi.PodSandboxNetworkStatus{Ip: podIP, AdditionalIps: []*runtimeapi.PodIP{}}, Linux: &runtimeapi.LinuxPodSandboxStatus{ Namespaces: &runtimeapi.Namespace{ Options: &runtimeapi.NamespaceOption{ diff --git a/pkg/kubelet/dockershim/helpers_linux.go b/pkg/kubelet/dockershim/helpers_linux.go index b6d72b99350..34635413a00 100644 --- a/pkg/kubelet/dockershim/helpers_linux.go +++ b/pkg/kubelet/dockershim/helpers_linux.go @@ -136,8 +136,8 @@ func (ds *dockerService) updateCreateConfig( return nil } -func (ds *dockerService) determinePodIPBySandboxID(uid string) string { - return "" +func (ds *dockerService) determinePodIPBySandboxID(uid string) []string { + return nil } func getNetworkNamespace(c *dockertypes.ContainerJSON) (string, error) { diff --git a/pkg/kubelet/dockershim/helpers_unsupported.go b/pkg/kubelet/dockershim/helpers_unsupported.go index 00c4a096002..04003d9fd51 100644 --- a/pkg/kubelet/dockershim/helpers_unsupported.go +++ b/pkg/kubelet/dockershim/helpers_unsupported.go @@ -45,9 +45,9 @@ func (ds *dockerService) updateCreateConfig( return nil } -func (ds *dockerService) determinePodIPBySandboxID(uid string) string { +func (ds *dockerService) determinePodIPBySandboxID(uid string) []string { klog.Warningf("determinePodIPBySandboxID is unsupported in this build") - return "" + return nil } func getNetworkNamespace(c *dockertypes.ContainerJSON) (string, error) { diff --git a/pkg/kubelet/dockershim/helpers_windows.go b/pkg/kubelet/dockershim/helpers_windows.go index 2bdf4feaa68..92bc5c16eaa 100644 --- a/pkg/kubelet/dockershim/helpers_windows.go +++ b/pkg/kubelet/dockershim/helpers_windows.go @@ -97,7 +97,7 @@ func applyWindowsContainerSecurityContext(wsc *runtimeapi.WindowsContainerSecuri } } -func (ds *dockerService) determinePodIPBySandboxID(sandboxID string) string { +func (ds *dockerService) determinePodIPBySandboxID(sandboxID string) []string { opts := dockertypes.ContainerListOptions{ All: true, Filters: dockerfilters.NewArgs(), @@ -108,7 +108,7 @@ func (ds *dockerService) determinePodIPBySandboxID(sandboxID string) string { f.AddLabel(sandboxIDLabelKey, sandboxID) containers, err := ds.client.ListContainers(opts) if err != nil { - return "" + return nil } for _, c := range containers { @@ -144,8 +144,8 @@ func (ds *dockerService) determinePodIPBySandboxID(sandboxID string) string { // Hyper-V only supports one container per Pod yet and the container will have a different // IP address from sandbox. Return the first non-sandbox container IP as POD IP. // TODO(feiskyer): remove this workaround after Hyper-V supports multiple containers per Pod. - if containerIP := ds.getIP(c.ID, r); containerIP != "" { - return containerIP + if containerIPs := ds.getIPs(c.ID, r); len(containerIPs) != 0 { + return containerIPs } } else { // Do not return any IP, so that we would continue and get the IP of the Sandbox. @@ -153,17 +153,17 @@ func (ds *dockerService) determinePodIPBySandboxID(sandboxID string) string { // to replicate the DNS registry key to the Workload container (IP/Gateway/MAC is // set separately than DNS). // TODO(feiskyer): remove this workaround after Namespace is supported in Windows RS5. - ds.getIP(sandboxID, r) + ds.getIPs(sandboxID, r) } } else { // ds.getIP will call the CNI plugin to fetch the IP - if containerIP := ds.getIP(c.ID, r); containerIP != "" { - return containerIP + if containerIPs := ds.getIPs(c.ID, r); len(containerIPs) != 0 { + return containerIPs } } } - return "" + return nil } func getNetworkNamespace(c *dockertypes.ContainerJSON) (string, error) { diff --git a/pkg/kubelet/dockershim/network/BUILD b/pkg/kubelet/dockershim/network/BUILD index 330764a3c08..f8d3b314fd0 100644 --- a/pkg/kubelet/dockershim/network/BUILD +++ b/pkg/kubelet/dockershim/network/BUILD @@ -9,6 +9,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/kubelet/dockershim/network", visibility = ["//visibility:public"], deps = [ + "//pkg/features:go_default_library", "//pkg/kubelet/apis/config:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/dockershim/network/hostport:go_default_library", @@ -18,6 +19,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", ], diff --git a/pkg/kubelet/dockershim/network/cni/cni_others.go b/pkg/kubelet/dockershim/network/cni/cni_others.go index 66e08c7c31f..f6f2ccd6595 100644 --- a/pkg/kubelet/dockershim/network/cni/cni_others.go +++ b/pkg/kubelet/dockershim/network/cni/cni_others.go @@ -69,12 +69,15 @@ func (plugin *cniNetworkPlugin) GetPodNetworkStatus(namespace string, name strin return nil, fmt.Errorf("Cannot find the network namespace, skipping pod network status for container %q", id) } - ip, err := network.GetPodIP(plugin.execer, plugin.nsenterPath, netnsPath, network.DefaultInterfaceName) + ips, err := network.GetPodIPs(plugin.execer, plugin.nsenterPath, netnsPath, network.DefaultInterfaceName) if err != nil { return nil, err } - return &network.PodNetworkStatus{IP: ip}, nil + return &network.PodNetworkStatus{ + IP: ips[0], + IPs: ips, + }, nil } // buildDNSCapabilities builds cniDNSConfig from runtimeapi.DNSConfig. diff --git a/pkg/kubelet/dockershim/network/kubenet/BUILD b/pkg/kubelet/dockershim/network/kubenet/BUILD index 0b4c606aebe..9264f44ac73 100644 --- a/pkg/kubelet/dockershim/network/kubenet/BUILD +++ b/pkg/kubelet/dockershim/network/kubenet/BUILD @@ -36,6 +36,7 @@ go_library( "//pkg/kubelet/dockershim/network:go_default_library", ], "@io_bazel_rules_go//go/platform:linux": [ + "//pkg/features:go_default_library", "//pkg/kubelet/apis/config:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/dockershim/network:go_default_library", @@ -48,6 +49,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/github.com/containernetworking/cni/libcni:go_default_library", "//vendor/github.com/containernetworking/cni/pkg/types:go_default_library", "//vendor/github.com/containernetworking/cni/pkg/types/020:go_default_library", @@ -55,6 +57,7 @@ go_library( "//vendor/golang.org/x/sys/unix:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], "@io_bazel_rules_go//go/platform:nacl": [ "//pkg/kubelet/apis/config:go_default_library", @@ -105,6 +108,7 @@ go_test( "//pkg/util/bandwidth:go_default_library", "//pkg/util/iptables/testing:go_default_library", "//pkg/util/sysctl/testing:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/mock:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", diff --git a/pkg/kubelet/dockershim/network/kubenet/kubenet_linux.go b/pkg/kubelet/dockershim/network/kubenet/kubenet_linux.go index a22719cf45d..573c443d18a 100644 --- a/pkg/kubelet/dockershim/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/dockershim/network/kubenet/kubenet_linux.go @@ -46,6 +46,10 @@ import ( utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilexec "k8s.io/utils/exec" + + utilfeature "k8s.io/apiserver/pkg/util/feature" + kubefeatures "k8s.io/kubernetes/pkg/features" + netutils "k8s.io/utils/net" ) const ( @@ -63,6 +67,29 @@ const ( // defaultIPAMDir is the default location for the checkpoint files stored by host-local ipam // https://github.com/containernetworking/cni/tree/master/plugins/ipam/host-local#backends defaultIPAMDir = "/var/lib/cni/networks" + + zeroCIDRv6 = "::/0" + zeroCIDRv4 = "0.0.0.0/0" + + NET_CONFIG_TEMPLATE = `{ + "cniVersion": "0.1.0", + "name": "kubenet", + "type": "bridge", + "bridge": "%s", + "mtu": %d, + "addIf": "%s", + "isGateway": true, + "ipMasq": false, + "hairpinMode": %t, + "ipam": { + "type": "host-local", + "ranges": [%s], + "routes": [ + { "dst": "%s" }, + { "dst": "%s" } + ] + } +}` ) // CNI plugins required by kubenet in /opt/cni/bin or user-specified directory @@ -77,7 +104,7 @@ type kubenetNetworkPlugin struct { cniConfig libcni.CNI bandwidthShaper bandwidth.Shaper mu sync.Mutex //Mutex for protecting podIPs map, netConfig, and shaper initialization - podIPs map[kubecontainer.ContainerID]string + podIPs map[kubecontainer.ContainerID]utilsets.String mtu int execer utilexec.Interface nsenterPath string @@ -88,33 +115,36 @@ type kubenetNetworkPlugin struct { hostportSyncer hostport.HostportSyncer hostportManager hostport.HostPortManager iptables utiliptables.Interface + iptablesv6 utiliptables.Interface sysctl utilsysctl.Interface ebtables utilebtables.Interface // binDirs is passed by kubelet cni-bin-dir parameter. // kubenet will search for CNI binaries in DefaultCNIDir first, then continue to binDirs. binDirs []string nonMasqueradeCIDR string - podCidr string - gateway net.IP cacheDir string + podCIDRs []*net.IPNet + podGateways []net.IP } func NewPlugin(networkPluginDirs []string, cacheDir string) network.NetworkPlugin { - protocol := utiliptables.ProtocolIpv4 execer := utilexec.New() dbus := utildbus.New() - sysctl := utilsysctl.New() - iptInterface := utiliptables.New(execer, dbus, protocol) + iptInterface := utiliptables.New(execer, dbus, utiliptables.ProtocolIpv4) + iptInterfacev6 := utiliptables.New(execer, dbus, utiliptables.ProtocolIpv6) return &kubenetNetworkPlugin{ - podIPs: make(map[kubecontainer.ContainerID]string), + podIPs: make(map[kubecontainer.ContainerID]utilsets.String), execer: utilexec.New(), iptables: iptInterface, - sysctl: sysctl, + iptablesv6: iptInterfacev6, + sysctl: utilsysctl.New(), binDirs: append([]string{DefaultCNIDir}, networkPluginDirs...), hostportSyncer: hostport.NewHostportSyncer(iptInterface), hostportManager: hostport.NewHostportManager(iptInterface), nonMasqueradeCIDR: "10.0.0.0/8", cacheDir: cacheDir, + podCIDRs: make([]*net.IPNet, 0), + podGateways: make([]net.IP, 0), } } @@ -171,8 +201,14 @@ func (plugin *kubenetNetworkPlugin) Init(host network.Host, hairpinMode kubeletc // TODO: move thic logic into cni bridge plugin and remove this from kubenet func (plugin *kubenetNetworkPlugin) ensureMasqRule() error { - if plugin.nonMasqueradeCIDR != "0.0.0.0/0" { - if _, err := plugin.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, + if plugin.nonMasqueradeCIDR != zeroCIDRv4 && plugin.nonMasqueradeCIDR != zeroCIDRv6 { + // switch according to target nonMasqueradeCidr ip family + ipt := plugin.iptables + if netutils.IsIPv6CIDRString(plugin.nonMasqueradeCIDR) { + ipt = plugin.iptablesv6 + } + + if _, err := ipt.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, "-m", "comment", "--comment", "kubenet: SNAT for outbound traffic from cluster", "-m", "addrtype", "!", "--dst-type", "LOCAL", "!", "-d", plugin.nonMasqueradeCIDR, @@ -207,27 +243,8 @@ func findMinMTU() (*net.Interface, error) { return &intfs[defIntfIndex], nil } -const NET_CONFIG_TEMPLATE = `{ - "cniVersion": "0.1.0", - "name": "kubenet", - "type": "bridge", - "bridge": "%s", - "mtu": %d, - "addIf": "%s", - "isGateway": true, - "ipMasq": false, - "hairpinMode": %t, - "ipam": { - "type": "host-local", - "subnet": "%s", - "gateway": "%s", - "routes": [ - { "dst": "0.0.0.0/0" } - ] - } -}` - func (plugin *kubenetNetworkPlugin) Event(name string, details map[string]interface{}) { + var err error if name != network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE { return } @@ -246,33 +263,54 @@ func (plugin *kubenetNetworkPlugin) Event(name string, details map[string]interf return } - klog.V(5).Infof("PodCIDR is set to %q", podCIDR) - _, cidr, err := net.ParseCIDR(podCIDR) - if err == nil { - setHairpin := plugin.hairpinMode == kubeletconfig.HairpinVeth - // Set bridge address to first address in IPNet - cidr.IP[len(cidr.IP)-1] += 1 + klog.V(4).Infof("kubenet: PodCIDR is set to %q", podCIDR) + podCIDRs := strings.Split(podCIDR, ",") - json := fmt.Sprintf(NET_CONFIG_TEMPLATE, BridgeName, plugin.mtu, network.DefaultInterfaceName, setHairpin, podCIDR, cidr.IP.String()) - klog.V(2).Infof("CNI network config set to %v", json) - plugin.netConfig, err = libcni.ConfFromBytes([]byte(json)) - if err == nil { - klog.V(5).Infof("CNI network config:\n%s", json) - - // Ensure cbr0 has no conflicting addresses; CNI's 'bridge' - // plugin will bail out if the bridge has an unexpected one - plugin.clearBridgeAddressesExcept(cidr) - } - plugin.podCidr = podCIDR - plugin.gateway = cidr.IP + // reset to one cidr if dual stack is not enabled + if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.IPv6DualStack) && len(podCIDRs) > 1 { + klog.V(2).Infof("This node has multiple pod cidrs assigned and dual stack is not enabled. ignoring all except first cidr") + podCIDRs = podCIDRs[0:1] } + for idx, currentPodCIDR := range podCIDRs { + _, cidr, err := net.ParseCIDR(currentPodCIDR) + if nil != err { + klog.Warningf("Failed to generate CNI network config with cidr %s at indx:%v: %v", currentPodCIDR, idx, err) + return + } + // create list of ips and gateways + cidr.IP[len(cidr.IP)-1] += 1 // Set bridge address to first address in IPNet + plugin.podCIDRs = append(plugin.podCIDRs, cidr) + plugin.podGateways = append(plugin.podGateways, cidr.IP) + } + + //setup hairpinMode + setHairpin := plugin.hairpinMode == kubeletconfig.HairpinVeth + + json := fmt.Sprintf(NET_CONFIG_TEMPLATE, BridgeName, plugin.mtu, network.DefaultInterfaceName, setHairpin, plugin.getRangesConfig(), zeroCIDRv4, zeroCIDRv6) + klog.V(4).Infof("CNI network config set to %v", json) + plugin.netConfig, err = libcni.ConfFromBytes([]byte(json)) if err != nil { - klog.Warningf("Failed to generate CNI network config: %v", err) + klog.Warningf("** failed to set up CNI with %v err:%v", json, err) + // just incase it was set by mistake + plugin.netConfig = nil + // we bail out by clearing the *entire* list + // of addresses assigned to cbr0 + plugin.clearUnusedBridgeAddresses() } } -func (plugin *kubenetNetworkPlugin) clearBridgeAddressesExcept(keep *net.IPNet) { +// clear all address on bridge except those operated on by kubenet +func (plugin *kubenetNetworkPlugin) clearUnusedBridgeAddresses() { + cidrIncluded := func(list []*net.IPNet, check *net.IPNet) bool { + for _, thisNet := range list { + if utilnet.IPNetEqual(thisNet, check) { + return true + } + } + return false + } + bridge, err := netlink.LinkByName(BridgeName) if err != nil { return @@ -280,11 +318,12 @@ func (plugin *kubenetNetworkPlugin) clearBridgeAddressesExcept(keep *net.IPNet) addrs, err := netlink.AddrList(bridge, unix.AF_INET) if err != nil { + klog.V(2).Infof("attempting to get address for interface: %s failed with err:%v", BridgeName, err) return } for _, addr := range addrs { - if !utilnet.IPNetEqual(addr.IPNet, keep) { + if !cidrIncluded(plugin.podCIDRs, addr.IPNet) { klog.V(2).Infof("Removing old address %s from %s", addr.IPNet.String(), BridgeName) netlink.AddrDel(bridge, &addr) } @@ -301,6 +340,7 @@ func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int { // setup sets up networking through CNI using the given ns/name and sandbox ID. func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error { + var ipv4, ipv6 net.IP // Disable DAD so we skip the kernel delay on bringing up new interfaces. if err := plugin.disableContainerDAD(id); err != nil { klog.V(3).Infof("Failed to disable DAD in container: %v", err) @@ -321,14 +361,19 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube if err != nil { return fmt.Errorf("unable to understand network config: %v", err) } - if res.IP4 == nil { - return fmt.Errorf("CNI plugin reported no IPv4 address for container %v.", id) - } - ip4 := res.IP4.IP.IP.To4() - if ip4 == nil { - return fmt.Errorf("CNI plugin reported an invalid IPv4 address for container %v: %+v.", id, res.IP4) + //TODO: v1.16 (khenidak) update NET_CONFIG_TEMPLATE to CNI version 0.3.0 or later so + // that we get multiple IP addresses in the returned Result structure + if res.IP4 != nil { + ipv4 = res.IP4.IP.IP.To4() } + if res.IP6 != nil { + ipv6 = res.IP6.IP.IP + } + + if ipv4 == nil && ipv6 == nil { + return fmt.Errorf("cni didn't report ipv4 ipv6") + } // Put the container bridge into promiscuous mode to force it to accept hairpin packets. // TODO: Remove this once the kernel bug (#20096) is fixed. if plugin.hairpinMode == kubeletconfig.PromiscuousBridge { @@ -348,57 +393,97 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube plugin.syncEbtablesDedupRules(link.Attrs().HardwareAddr) } - plugin.podIPs[id] = ip4.String() + // add the ip to tracked ips + if ipv4 != nil { + plugin.addPodIP(id, ipv4.String()) + } + if ipv6 != nil { + plugin.addPodIP(id, ipv6.String()) + } - // The first SetUpPod call creates the bridge; get a shaper for the sake of initialization - // TODO: replace with CNI traffic shaper plugin + if err := plugin.addTrafficShaping(id, annotations); err != nil { + return err + } + + return plugin.addPortMapping(id, name, namespace) +} + +// The first SetUpPod call creates the bridge; get a shaper for the sake of initialization +// TODO: replace with CNI traffic shaper plugin +func (plugin *kubenetNetworkPlugin) addTrafficShaping(id kubecontainer.ContainerID, annotations map[string]string) error { shaper := plugin.shaper() ingress, egress, err := bandwidth.ExtractPodBandwidthResources(annotations) if err != nil { return fmt.Errorf("Error reading pod bandwidth annotations: %v", err) } - if egress != nil || ingress != nil { - if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil { - return fmt.Errorf("Failed to add pod to shaper: %v", err) - } + iplist, exists := plugin.getCachedPodIPs(id) + if !exists { + return fmt.Errorf("pod %s does not have recorded ips", id) } - // TODO: replace with CNI port-forwarding plugin - portMappings, err := plugin.host.GetPodPortMappings(id.ID) - if err != nil { - return err - } - if portMappings != nil && len(portMappings) > 0 { - if err := plugin.hostportManager.Add(id.ID, &hostport.PodPortMapping{ - Namespace: namespace, - Name: name, - PortMappings: portMappings, - IP: ip4, - HostNetwork: false, - }, BridgeName); err != nil { - return err + if egress != nil || ingress != nil { + for _, ip := range iplist { + mask := 32 + if netutils.IsIPv6String(ip) { + mask = 128 + } + if err != nil { + return fmt.Errorf("failed to setup traffic shaping for pod ip%s", ip) + } + + if err := shaper.ReconcileCIDR(fmt.Sprintf("%v/%v", ip, mask), egress, ingress); err != nil { + return fmt.Errorf("Failed to add pod to shaper: %v", err) + } } } return nil } -func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations, options map[string]string) error { - plugin.mu.Lock() - defer plugin.mu.Unlock() +// TODO: replace with CNI port-forwarding plugin +func (plugin *kubenetNetworkPlugin) addPortMapping(id kubecontainer.ContainerID, name, namespace string) error { + portMappings, err := plugin.host.GetPodPortMappings(id.ID) + if err != nil { + return err + } + if len(portMappings) == 0 { + return nil + } + + iplist, exists := plugin.getCachedPodIPs(id) + if !exists { + return fmt.Errorf("pod %s does not have recorded ips", id) + } + + for _, ip := range iplist { + pm := &hostport.PodPortMapping{ + Namespace: namespace, + Name: name, + PortMappings: portMappings, + IP: net.ParseIP(ip), + HostNetwork: false, + } + if err := plugin.hostportManager.Add(id.ID, pm, BridgeName); err != nil { + return err + } + } + + return nil +} + +func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations, options map[string]string) error { start := time.Now() - defer func() { - klog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name) - }() if err := plugin.Status(); err != nil { return fmt.Errorf("Kubenet cannot SetUpPod: %v", err) } + defer func() { + klog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name) + }() + if err := plugin.setup(namespace, name, id, annotations); err != nil { - // Make sure everything gets cleaned up on errors - podIP, _ := plugin.podIPs[id] - if err := plugin.teardown(namespace, name, id, podIP); err != nil { + if err := plugin.teardown(namespace, name, id); err != nil { // Not a hard error or warning klog.V(4).Infof("Failed to clean up %s/%s after SetUpPod failure: %v", namespace, name, err) } @@ -415,27 +500,12 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k // Tears down as much of a pod's network as it can even if errors occur. Returns // an aggregate error composed of all errors encountered during the teardown. -func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id kubecontainer.ContainerID, podIP string) error { +func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id kubecontainer.ContainerID) error { errList := []error{} - if podIP != "" { - klog.V(5).Infof("Removing pod IP %s from shaper", podIP) - // shaper wants /32 - if err := plugin.shaper().Reset(fmt.Sprintf("%s/32", podIP)); err != nil { - // Possible bandwidth shaping wasn't enabled for this pod anyways - klog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", podIP, err) - } - - delete(plugin.podIPs, id) - } - + // no ip dependent actions if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil { - // This is to prevent returning error when TearDownPod is called twice on the same pod. This helps to reduce event pollution. - if podIP != "" { - klog.Warningf("Failed to delete container from kubenet: %v", err) - } else { - errList = append(errList, err) - } + errList = append(errList, err) } portMappings, err := plugin.host.GetPodPortMappings(id.ID) @@ -451,13 +521,33 @@ func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id k errList = append(errList, err) } } + + iplist, exists := plugin.getCachedPodIPs(id) + if !exists || len(iplist) == 0 { + klog.V(5).Infof("container %s (%s/%s) does not have recorded. ignoring teardown call", id, name, namespace) + return nil + } + + for _, ip := range iplist { + klog.V(5).Infof("Removing pod IP %s from shaper for (%s/%s)", ip, name, namespace) + // shaper uses a cidr, but we are using a single IP. + isV6 := netutils.IsIPv6String(ip) + mask := "32" + if isV6 { + mask = "128" + } + + if err := plugin.shaper().Reset(fmt.Sprintf("%s/%s", ip, mask)); err != nil { + // Possible bandwidth shaping wasn't enabled for this pod anyways + klog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", ip, err) + } + + plugin.removePodIP(id, ip) + } return utilerrors.NewAggregate(errList) } func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error { - plugin.mu.Lock() - defer plugin.mu.Unlock() - start := time.Now() defer func() { klog.V(4).Infof("TearDownPod took %v for %s/%s", time.Since(start), namespace, name) @@ -467,9 +557,7 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i return fmt.Errorf("Kubenet needs a PodCIDR to tear down pods") } - // no cached IP is Ok during teardown - podIP, _ := plugin.podIPs[id] - if err := plugin.teardown(namespace, name, id, podIP); err != nil { + if err := plugin.teardown(namespace, name, id); err != nil { return err } @@ -477,20 +565,19 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i if err := plugin.ensureMasqRule(); err != nil { klog.Errorf("Failed to ensure MASQ rule: %v", err) } - return nil } // TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin. // Also fix the runtime's call to Status function to be done only in the case that the IP is lost, no need to do periodic calls func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name string, id kubecontainer.ContainerID) (*network.PodNetworkStatus, error) { - plugin.mu.Lock() - defer plugin.mu.Unlock() - // Assuming the ip of pod does not change. Try to retrieve ip from kubenet map first. - if podIP, ok := plugin.podIPs[id]; ok { - return &network.PodNetworkStatus{IP: net.ParseIP(podIP)}, nil + // try cached version + networkStatus := plugin.getNetworkStatus(id) + if networkStatus != nil { + return networkStatus, nil } + // not a cached version, get via network ns netnsPath, err := plugin.host.GetNetNS(id.ID) if err != nil { return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err) @@ -498,13 +585,46 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s if netnsPath == "" { return nil, fmt.Errorf("Cannot find the network namespace, skipping pod network status for container %q", id) } - ip, err := network.GetPodIP(plugin.execer, plugin.nsenterPath, netnsPath, network.DefaultInterfaceName) + ips, err := network.GetPodIPs(plugin.execer, plugin.nsenterPath, netnsPath, network.DefaultInterfaceName) if err != nil { return nil, err } - plugin.podIPs[id] = ip.String() - return &network.PodNetworkStatus{IP: ip}, nil + // cache the ips + for _, ip := range ips { + plugin.addPodIP(id, ip.String()) + } + + // return from cached + return plugin.getNetworkStatus(id), nil +} + +// returns networkstatus +func (plugin *kubenetNetworkPlugin) getNetworkStatus(id kubecontainer.ContainerID) *network.PodNetworkStatus { + // Assuming the ip of pod does not change. Try to retrieve ip from kubenet map first. + iplist, ok := plugin.getCachedPodIPs(id) + if !ok { + return nil + } + // sort making v4 first + // TODO: (khenidak) IPv6 beta stage. + // This - forced sort - could be avoided by checking which cidr that an IP belongs + // to, then placing the IP according to cidr index. But before doing that. Check how IP is collected + // across all of kubelet code (against cni and cri). + ips := make([]net.IP, 0) + for _, ip := range iplist { + isV6 := netutils.IsIPv6String(ip) + if !isV6 { + ips = append([]net.IP{net.ParseIP(ip)}, ips...) + } else { + ips = append(ips, net.ParseIP(ip)) + } + } + + return &network.PodNetworkStatus{ + IP: ips[0], + IPs: ips, + } } func (plugin *kubenetNetworkPlugin) Status() error { @@ -571,11 +691,8 @@ func (plugin *kubenetNetworkPlugin) addContainerToNetwork(config *libcni.Network } klog.V(3).Infof("Adding %s/%s to '%s' with CNI '%s' plugin and runtime: %+v", namespace, name, config.Network.Name, config.Network.Type, rt) - // The network plugin can take up to 3 seconds to execute, - // so yield the lock while it runs. - plugin.mu.Unlock() + res, err := plugin.cniConfig.AddNetwork(context.TODO(), config, rt) - plugin.mu.Lock() if err != nil { return nil, fmt.Errorf("Error adding container to network: %v", err) } @@ -600,8 +717,9 @@ func (plugin *kubenetNetworkPlugin) delContainerFromNetwork(config *libcni.Netwo // shaper retrieves the bandwidth shaper and, if it hasn't been fetched before, // initializes it and ensures the bridge is appropriately configured -// This function should only be called while holding the `plugin.mu` lock func (plugin *kubenetNetworkPlugin) shaper() bandwidth.Shaper { + plugin.mu.Lock() + defer plugin.mu.Unlock() if plugin.bandwidthShaper == nil { plugin.bandwidthShaper = bandwidth.NewTCShaper(BridgeName) plugin.bandwidthShaper.ReconcileInterface() @@ -624,30 +742,43 @@ func (plugin *kubenetNetworkPlugin) syncEbtablesDedupRules(macAddr net.HardwareA return } - klog.V(3).Infof("Filtering packets with ebtables on mac address: %v, gateway: %v, pod CIDR: %v", macAddr.String(), plugin.gateway.String(), plugin.podCidr) + // ensure custom chain exists _, err = plugin.ebtables.EnsureChain(utilebtables.TableFilter, dedupChain) if err != nil { klog.Errorf("Failed to ensure %v chain %v", utilebtables.TableFilter, dedupChain) return } + // jump to custom chain to the chain from core tables _, err = plugin.ebtables.EnsureRule(utilebtables.Append, utilebtables.TableFilter, utilebtables.ChainOutput, "-j", string(dedupChain)) if err != nil { klog.Errorf("Failed to ensure %v chain %v jump to %v chain: %v", utilebtables.TableFilter, utilebtables.ChainOutput, dedupChain, err) return } - commonArgs := []string{"-p", "IPv4", "-s", macAddr.String(), "-o", "veth+"} - _, err = plugin.ebtables.EnsureRule(utilebtables.Prepend, utilebtables.TableFilter, dedupChain, append(commonArgs, "--ip-src", plugin.gateway.String(), "-j", "ACCEPT")...) - if err != nil { - klog.Errorf("Failed to ensure packets from cbr0 gateway to be accepted") - return + // per gateway rule + for idx, gw := range plugin.podGateways { + klog.V(3).Infof("Filtering packets with ebtables on mac address: %v, gateway: %v, pod CIDR: %v", macAddr.String(), gw.String(), plugin.podCIDRs[idx].String()) - } - _, err = plugin.ebtables.EnsureRule(utilebtables.Append, utilebtables.TableFilter, dedupChain, append(commonArgs, "--ip-src", plugin.podCidr, "-j", "DROP")...) - if err != nil { - klog.Errorf("Failed to ensure packets from podCidr but has mac address of cbr0 to get dropped.") - return + bIsV6 := netutils.IsIPv6(gw) + IPFamily := "IPv4" + ipSrc := "--ip-src" + if bIsV6 { + IPFamily = "IPv6" + ipSrc = "--ip6-src" + } + commonArgs := []string{"-p", IPFamily, "-s", macAddr.String(), "-o", "veth+"} + _, err = plugin.ebtables.EnsureRule(utilebtables.Prepend, utilebtables.TableFilter, dedupChain, append(commonArgs, ipSrc, gw.String(), "-j", "ACCEPT")...) + if err != nil { + klog.Errorf("Failed to ensure packets from cbr0 gateway:%v to be accepted with error:%v", gw.String(), err) + return + + } + _, err = plugin.ebtables.EnsureRule(utilebtables.Append, utilebtables.TableFilter, dedupChain, append(commonArgs, ipSrc, plugin.podCIDRs[idx].String(), "-j", "DROP")...) + if err != nil { + klog.Errorf("Failed to ensure packets from podCidr[%v] but has mac address of cbr0 to get dropped. err:%v", plugin.podCIDRs[idx].String(), err) + return + } } } @@ -690,3 +821,71 @@ func (plugin *kubenetNetworkPlugin) disableContainerDAD(id kubecontainer.Contain } return nil } + +// given a n cidrs assigned to nodes, +// create bridge configuration that conforms to them +func (plugin *kubenetNetworkPlugin) getRangesConfig() string { + createRange := func(thisNet *net.IPNet) string { + template := ` +[{ +"subnet": "%s", +"gateway": "%s" +}]` + return fmt.Sprintf(template, thisNet.String(), thisNet.IP.String()) + } + + ranges := make([]string, len(plugin.podCIDRs)) + for idx, thisCIDR := range plugin.podCIDRs { + ranges[idx] = createRange(thisCIDR) + } + //[{range}], [{range}] + // each range is a subnet and a gateway + return strings.Join(ranges[:], ",") +} + +func (plugin *kubenetNetworkPlugin) addPodIP(id kubecontainer.ContainerID, ip string) { + plugin.mu.Lock() + defer plugin.mu.Unlock() + + _, exist := plugin.podIPs[id] + if !exist { + plugin.podIPs[id] = utilsets.NewString() + } + + if !plugin.podIPs[id].Has(ip) { + plugin.podIPs[id].Insert(ip) + } +} + +func (plugin *kubenetNetworkPlugin) removePodIP(id kubecontainer.ContainerID, ip string) { + plugin.mu.Lock() + defer plugin.mu.Unlock() + + _, exist := plugin.podIPs[id] + if !exist { + return // did we restart kubelet? + } + + if plugin.podIPs[id].Has(ip) { + plugin.podIPs[id].Delete(ip) + } + + // if there is no more ips here. let us delete + if plugin.podIPs[id].Len() == 0 { + delete(plugin.podIPs, id) + } +} + +// returns a copy of pod ips +// false is returned if id does not exist +func (plugin *kubenetNetworkPlugin) getCachedPodIPs(id kubecontainer.ContainerID) ([]string, bool) { + plugin.mu.Lock() + defer plugin.mu.Unlock() + + iplist, exists := plugin.podIPs[id] + if !exists { + return nil, false + } + + return iplist.UnsortedList(), true +} diff --git a/pkg/kubelet/dockershim/network/kubenet/kubenet_linux_test.go b/pkg/kubelet/dockershim/network/kubenet/kubenet_linux_test.go index cc91b74f624..7fa70e9c59a 100644 --- a/pkg/kubelet/dockershim/network/kubenet/kubenet_linux_test.go +++ b/pkg/kubelet/dockershim/network/kubenet/kubenet_linux_test.go @@ -18,12 +18,13 @@ package kubenet import ( "fmt" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "strings" "testing" + utilsets "k8s.io/apimachinery/pkg/util/sets" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockershim/network" @@ -40,7 +41,7 @@ import ( // test it fulfills the NetworkPlugin interface var _ network.NetworkPlugin = &kubenetNetworkPlugin{} -func newFakeKubenetPlugin(initMap map[kubecontainer.ContainerID]string, execer exec.Interface, host network.Host) *kubenetNetworkPlugin { +func newFakeKubenetPlugin(initMap map[kubecontainer.ContainerID]utilsets.String, execer exec.Interface, host network.Host) *kubenetNetworkPlugin { return &kubenetNetworkPlugin{ podIPs: initMap, execer: execer, @@ -50,31 +51,38 @@ func newFakeKubenetPlugin(initMap map[kubecontainer.ContainerID]string, execer e } func TestGetPodNetworkStatus(t *testing.T) { - podIPMap := make(map[kubecontainer.ContainerID]string) - podIPMap[kubecontainer.ContainerID{ID: "1"}] = "10.245.0.2" - podIPMap[kubecontainer.ContainerID{ID: "2"}] = "10.245.0.3" + podIPMap := make(map[kubecontainer.ContainerID]utilsets.String) + podIPMap[kubecontainer.ContainerID{ID: "1"}] = utilsets.NewString("10.245.0.2") + podIPMap[kubecontainer.ContainerID{ID: "2"}] = utilsets.NewString("10.245.0.3") + podIPMap[kubecontainer.ContainerID{ID: "3"}] = utilsets.NewString("10.245.0.4", "2000::") testCases := []struct { id string expectError bool - expectIP string + expectIP utilsets.String }{ //in podCIDR map { - "1", - false, - "10.245.0.2", + id: "1", + expectError: false, + expectIP: utilsets.NewString("10.245.0.2"), }, { - "2", - false, - "10.245.0.3", + id: "2", + expectError: false, + expectIP: utilsets.NewString("10.245.0.3"), }, - //not in podCIDR map { - "3", - true, - "", + id: "3", + expectError: false, + expectIP: utilsets.NewString("10.245.0.4", "2000::"), + }, + + //not in podIP map + { + id: "does-not-exist-map", + expectError: true, + expectIP: nil, }, //TODO: add test cases for retrieving ip inside container network namespace } @@ -85,11 +93,12 @@ func TestGetPodNetworkStatus(t *testing.T) { fCmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ func() ([]byte, error) { - ip, ok := podIPMap[kubecontainer.ContainerID{ID: t.id}] + ips, ok := podIPMap[kubecontainer.ContainerID{ID: t.id}] if !ok { return nil, fmt.Errorf("Pod IP %q not found", t.id) } - return []byte(ip), nil + ipsList := ips.UnsortedList() + return []byte(ipsList[0]), nil }, }, } @@ -119,9 +128,20 @@ func TestGetPodNetworkStatus(t *testing.T) { t.Errorf("Test case %d expects error but got error: %v", i, err) } } - if tc.expectIP != out.IP.String() { + seen := make(map[string]bool) + allExpected := tc.expectIP.UnsortedList() + for _, expectedIP := range allExpected { + for _, outIP := range out.IPs { + if expectedIP == outIP.String() { + seen[expectedIP] = true + break + } + } + } + if len(tc.expectIP) != len(seen) { t.Errorf("Test case %d expects ip %s but got %s", i, tc.expectIP, out.IP.String()) } + } } @@ -137,7 +157,8 @@ func TestTeardownCallsShaper(t *testing.T) { fhost := nettest.NewFakeHost(nil) fshaper := &bandwidth.FakeShaper{} mockcni := &mock_cni.MockCNI{} - kubenet := newFakeKubenetPlugin(map[kubecontainer.ContainerID]string{}, fexec, fhost) + ips := make(map[kubecontainer.ContainerID]utilsets.String) + kubenet := newFakeKubenetPlugin(ips, fexec, fhost) kubenet.cniConfig = mockcni kubenet.iptables = ipttest.NewFake() kubenet.bandwidthShaper = fshaper @@ -150,7 +171,7 @@ func TestTeardownCallsShaper(t *testing.T) { kubenet.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details) existingContainerID := kubecontainer.BuildContainerID("docker", "123") - kubenet.podIPs[existingContainerID] = "10.0.0.1" + kubenet.podIPs[existingContainerID] = utilsets.NewString("10.0.0.1") if err := kubenet.TearDownPod("namespace", "name", existingContainerID); err != nil { t.Fatalf("Unexpected error in TearDownPod: %v", err) @@ -185,7 +206,8 @@ func TestInit_MTU(t *testing.T) { } fhost := nettest.NewFakeHost(nil) - kubenet := newFakeKubenetPlugin(map[kubecontainer.ContainerID]string{}, fexec, fhost) + ips := make(map[kubecontainer.ContainerID]utilsets.String) + kubenet := newFakeKubenetPlugin(ips, fexec, fhost) kubenet.iptables = ipttest.NewFake() sysctl := sysctltest.NewFake() @@ -203,22 +225,23 @@ func TestInit_MTU(t *testing.T) { // This is how kubenet is invoked from the cri. func TestTearDownWithoutRuntime(t *testing.T) { testCases := []struct { - podCIDR string + podCIDR []string ip string - expectedGateway string + expectedGateway []string }{ { - podCIDR: "10.0.0.1/24", + podCIDR: []string{"10.0.0.1/24"}, ip: "10.0.0.1", - expectedGateway: "10.0.0.1", + expectedGateway: []string{"10.0.0.1"}, }, { - podCIDR: "2001:beef::1/48", + podCIDR: []string{"2001:beef::1/48"}, ip: "2001:beef::1", - expectedGateway: "2001:beef::1", + expectedGateway: []string{"2001:beef::1"}, }, } for _, tc := range testCases { + fhost := nettest.NewFakeHost(nil) fhost.Legacy = false mockcni := &mock_cni.MockCNI{} @@ -230,22 +253,39 @@ func TestTearDownWithoutRuntime(t *testing.T) { }, } - kubenet := newFakeKubenetPlugin(map[kubecontainer.ContainerID]string{}, fexec, fhost) + ips := make(map[kubecontainer.ContainerID]utilsets.String) + kubenet := newFakeKubenetPlugin(ips, fexec, fhost) kubenet.cniConfig = mockcni kubenet.iptables = ipttest.NewFake() details := make(map[string]interface{}) - details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = tc.podCIDR + details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = strings.Join(tc.podCIDR, ",") kubenet.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details) - if kubenet.gateway.String() != tc.expectedGateway { - t.Errorf("generated gateway: %q, expecting: %q", kubenet.gateway.String(), tc.expectedGateway) + if len(kubenet.podGateways) != len(tc.expectedGateway) { + t.Errorf("generated gateway: %q, expecting: %q are not of the same length", kubenet.podGateways, tc.expectedGateway) + continue } - if kubenet.podCidr != tc.podCIDR { - t.Errorf("generated podCidr: %q, expecting: %q", kubenet.podCidr, tc.podCIDR) + + for idx := range tc.expectedGateway { + if kubenet.podGateways[idx].String() != tc.expectedGateway[idx] { + t.Errorf("generated gateway: %q, expecting: %q", kubenet.podGateways[idx].String(), tc.expectedGateway[idx]) + + } } + + if len(kubenet.podCIDRs) != len(tc.podCIDR) { + t.Errorf("generated podCidr: %q, expecting: %q are not of the same length", kubenet.podCIDRs, tc.podCIDR) + continue + } + for idx := range tc.podCIDR { + if kubenet.podCIDRs[idx].String() != tc.podCIDR[idx] { + t.Errorf("generated podCidr: %q, expecting: %q", kubenet.podCIDRs[idx].String(), tc.podCIDR[idx]) + } + } + existingContainerID := kubecontainer.BuildContainerID("docker", "123") - kubenet.podIPs[existingContainerID] = tc.ip + kubenet.podIPs[existingContainerID] = utilsets.NewString(tc.ip) mockcni.On("DelNetwork", mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil) diff --git a/pkg/kubelet/dockershim/network/plugins.go b/pkg/kubelet/dockershim/network/plugins.go index de4a53d0199..18d921166dc 100644 --- a/pkg/kubelet/dockershim/network/plugins.go +++ b/pkg/kubelet/dockershim/network/plugins.go @@ -34,6 +34,9 @@ import ( "k8s.io/kubernetes/pkg/kubelet/dockershim/network/metrics" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilexec "k8s.io/utils/exec" + + utilfeature "k8s.io/apiserver/pkg/util/feature" + kubefeatures "k8s.io/kubernetes/pkg/features" ) const ( @@ -89,6 +92,8 @@ type PodNetworkStatus struct { // - service endpoints are constructed with // - will be reported in the PodStatus.PodIP field (will override the IP reported by docker) IP net.IP `json:"ip" description:"Primary IP address of the pod"` + // IPs is the list of IPs assigned to Pod. IPs[0] == IP. The rest of the list is additional IPs + IPs []net.IP `json:"ips" description:"list of additional ips (inclusive of IP) assigned to pod"` } // Host is an interface that plugins can use to access the kubelet. @@ -250,17 +255,37 @@ func getOnePodIP(execer utilexec.Interface, nsenterPath, netnsPath, interfaceNam } // GetPodIP gets the IP of the pod by inspecting the network info inside the pod's network namespace. -func GetPodIP(execer utilexec.Interface, nsenterPath, netnsPath, interfaceName string) (net.IP, error) { - ip, err := getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-4") - if err != nil { - // Fall back to IPv6 address if no IPv4 address is present - ip, err = getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-6") - } - if err != nil { - return nil, err +// TODO (khenidak). The "primary ip" in dual stack world does not really exist. For now +// we are defaulting to v4 as primary +func GetPodIPs(execer utilexec.Interface, nsenterPath, netnsPath, interfaceName string) ([]net.IP, error) { + if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.IPv6DualStack) { + ip, err := getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-4") + if err != nil { + // Fall back to IPv6 address if no IPv4 address is present + ip, err = getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-6") + } + if err != nil { + return nil, err + } + + return []net.IP{ip}, nil } - return ip, nil + list := make([]net.IP, 0) + var err4, err6 error + if ipv4, err4 := getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-4"); err4 != nil { + list = append(list, ipv4) + } + + if ipv6, err6 := getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-6"); err6 != nil { + list = append(list, ipv6) + } + + if len(list) == 0 { + return nil, utilerrors.NewAggregate([]error{err4, err6}) + } + return list, nil + } type NoopPortMappingGetter struct{} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 832763f67ef..7951ce0d47c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -27,6 +27,7 @@ import ( "os" "path" "sort" + "strings" "sync" "sync/atomic" "time" @@ -1520,8 +1521,10 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error { // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576) // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and // set pod IP to hostIP directly in runtime.GetPodStatus - podStatus.IP = apiPodStatus.PodIP - + podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs)) + for _, ipInfo := range apiPodStatus.PodIPs { + podStatus.IPs = append(podStatus.IPs, ipInfo.IP) + } // Record the time it takes for the pod to become running. existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID) if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning && @@ -2250,9 +2253,10 @@ func (kl *Kubelet) fastStatusUpdateOnce() { klog.Errorf(err.Error()) continue } - if node.Spec.PodCIDR != "" { - if _, err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil { - klog.Errorf("Pod CIDR update failed %v", err) + if len(node.Spec.PodCIDRs) != 0 { + podCIDRs := strings.Join(node.Spec.PodCIDRs, ",") + if _, err := kl.updatePodCIDR(podCIDRs); err != nil { + klog.Errorf("Pod CIDR update to %v failed %v", podCIDRs, err) continue } kl.updateRuntimeUp() diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 410b8847bc1..7b040c3ca44 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -22,11 +22,10 @@ import ( "net" goruntime "runtime" "sort" + "strings" "time" - "k8s.io/klog" - - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -34,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/types" utilfeature "k8s.io/apiserver/pkg/util/feature" cloudprovider "k8s.io/cloud-provider" + "k8s.io/klog" k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" @@ -417,11 +417,12 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { } podCIDRChanged := false - if node.Spec.PodCIDR != "" { + if len(node.Spec.PodCIDRs) != 0 { // Pod CIDR could have been updated before, so we cannot rely on // node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is // actually changed. - if podCIDRChanged, err = kl.updatePodCIDR(node.Spec.PodCIDR); err != nil { + podCIDRs := strings.Join(node.Spec.PodCIDRs, ",") + if podCIDRChanged, err = kl.updatePodCIDR(podCIDRs); err != nil { klog.Errorf(err.Error()) } } diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 665f4309d03..e79404223a6 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -23,6 +23,7 @@ import ( goruntime "runtime" "sort" "strconv" + "strings" "sync/atomic" "testing" "time" @@ -1004,11 +1005,12 @@ func TestUpdateNodeStatusWithLease(t *testing.T) { // Report node status if it is still within the duration of nodeStatusReportFrequency. clock.Step(10 * time.Second) assert.Equal(t, "", kubelet.runtimeState.podCIDR(), "Pod CIDR should be empty") - podCIDR := "10.0.0.0/24" - updatedNode.Spec.PodCIDR = podCIDR + podCIDRs := []string{"10.0.0.0/24", "2000::/10"} + updatedNode.Spec.PodCIDR = podCIDRs[0] + updatedNode.Spec.PodCIDRs = podCIDRs kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*updatedNode}}).ReactionChain assert.NoError(t, kubelet.updateNodeStatus()) - assert.Equal(t, podCIDR, kubelet.runtimeState.podCIDR(), "Pod CIDR should be updated now") + assert.Equal(t, strings.Join(podCIDRs, ","), kubelet.runtimeState.podCIDR(), "Pod CIDR should be updated now") // 2 more action (There were 7 actions before). actions = kubeClient.Actions() assert.Len(t, actions, 9) @@ -1019,7 +1021,8 @@ func TestUpdateNodeStatusWithLease(t *testing.T) { // Update node status when keeping the pod CIDR. // Do not report node status if it is within the duration of nodeStatusReportFrequency. clock.Step(10 * time.Second) - assert.Equal(t, podCIDR, kubelet.runtimeState.podCIDR(), "Pod CIDR should already be updated") + assert.Equal(t, strings.Join(podCIDRs, ","), kubelet.runtimeState.podCIDR(), "Pod CIDR should already be updated") + assert.NoError(t, kubelet.updateNodeStatus()) // Only 1 more action (There were 9 actions before). actions = kubeClient.Actions() diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index e01c100b96e..f0a129443a7 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1382,7 +1382,16 @@ func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.Po // alter the kubelet state at all. func (kl *Kubelet) convertStatusToAPIStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *v1.PodStatus { var apiPodStatus v1.PodStatus - apiPodStatus.PodIP = podStatus.IP + apiPodStatus.PodIPs = make([]v1.PodIP, 0, len(podStatus.IPs)) + for _, ip := range podStatus.IPs { + apiPodStatus.PodIPs = append(apiPodStatus.PodIPs, v1.PodIP{ + IP: ip, + }) + } + + if len(apiPodStatus.PodIPs) > 0 { + apiPodStatus.PodIP = apiPodStatus.PodIPs[0].IP + } // set status for Pods created on versions of kube older than 1.6 apiPodStatus.QOSClass = v1qos.GetPodQOS(pod) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 402e264f306..0a5e666da69 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -663,18 +663,18 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontaine // by container garbage collector. m.pruneInitContainersBeforeStart(pod, podStatus) - // We pass the value of the podIP down to generatePodSandboxConfig and + // We pass the value of the PRIMARY podIP down to generatePodSandboxConfig and // generateContainerConfig, which in turn passes it to various other // functions, in order to facilitate functionality that requires this // value (hosts file and downward API) and avoid races determining // the pod IP in cases where a container requires restart but the // podIP isn't in the status manager yet. // - // We default to the IP in the passed-in pod status, and overwrite it if the + // We default to the IPs in the passed-in pod status, and overwrite them if the // sandbox needs to be (re)started. - podIP := "" + var podIPs []string if podStatus != nil { - podIP = podStatus.IP + podIPs = podStatus.IPs } // Step 4: Create a sandbox for the pod if necessary. @@ -714,12 +714,20 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontaine // If we ever allow updating a pod from non-host-network to // host-network, we may use a stale IP. if !kubecontainer.IsHostNetworkPod(pod) { - // Overwrite the podIP passed in the pod status, since we just started the pod sandbox. - podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus) - klog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod)) + // Overwrite the podIPs passed in the pod status, since we just started the pod sandbox. + podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, podSandboxStatus) + klog.V(4).Infof("Determined the ip %v for pod %q after sandbox changed", podIPs, format.Pod(pod)) } } + // the start containers routines depend on pod ip(as in primary pod ip) + // instead of trying to figure out if we have 0 < len(podIPs) + // everytime, we short circuit it here + podIP := "" + if len(podIPs) != 0 { + podIP = podIPs[0] + } + // Get podSandboxConfig for containers to start. configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID) result.AddSyncResult(configPodSandboxResult) @@ -880,7 +888,7 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp klog.V(4).Infof("getSandboxIDByPodUID got sandbox IDs %q for pod %q", podSandboxIDs, podFullName) sandboxStatuses := make([]*runtimeapi.PodSandboxStatus, len(podSandboxIDs)) - podIP := "" + podIPs := []string{} for idx, podSandboxID := range podSandboxIDs { podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID) if err != nil { @@ -891,7 +899,7 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp // Only get pod IP from latest sandbox if idx == 0 && podSandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY { - podIP = m.determinePodSandboxIP(namespace, name, podSandboxStatus) + podIPs = m.determinePodSandboxIPs(namespace, name, podSandboxStatus) } } @@ -909,7 +917,7 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp ID: uid, Name: name, Namespace: namespace, - IP: podIP, + IPs: podIPs, SandboxStatuses: sandboxStatuses, ContainerStatuses: containerStatuses, }, nil diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go index 3d4ca8eeaa8..8652350f69f 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go @@ -114,18 +114,29 @@ func makeFakePodSandbox(t *testing.T, m *kubeGenericRuntimeManager, template san assert.NoError(t, err, "generatePodSandboxConfig for sandbox template %+v", template) podSandboxID := apitest.BuildSandboxName(config.Metadata) - return &apitest.FakePodSandbox{ + podSandBoxStatus := &apitest.FakePodSandbox{ PodSandboxStatus: runtimeapi.PodSandboxStatus{ Id: podSandboxID, Metadata: config.Metadata, State: template.state, CreatedAt: template.createdAt, Network: &runtimeapi.PodSandboxNetworkStatus{ - Ip: apitest.FakePodSandboxIP, + Ip: apitest.FakePodSandboxIPs[0], }, Labels: config.Labels, }, } + // assign additional IPs + additionalIPs := apitest.FakePodSandboxIPs[1:] + additionalPodIPs := make([]*runtimeapi.PodIP, 0, len(additionalIPs)) + for _, ip := range additionalIPs { + additionalPodIPs = append(additionalPodIPs, &runtimeapi.PodIP{ + Ip: ip, + }) + } + podSandBoxStatus.Network.AdditionalIps = additionalPodIPs + return podSandBoxStatus + } // makeFakePodSandboxes creates a group of fake pod sandboxes based on the sandbox templates. @@ -311,7 +322,7 @@ func TestGetPodStatus(t *testing.T) { assert.Equal(t, pod.UID, podStatus.ID) assert.Equal(t, pod.Name, podStatus.Name) assert.Equal(t, pod.Namespace, podStatus.Namespace) - assert.Equal(t, apitest.FakePodSandboxIP, podStatus.IP) + assert.Equal(t, apitest.FakePodSandboxIPs, podStatus.IPs) } func TestGetPods(t *testing.T) { diff --git a/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go index 3d9446bfc56..07824b3fbfd 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go @@ -214,20 +214,36 @@ func (m *kubeGenericRuntimeManager) getKubeletSandboxes(all bool) ([]*runtimeapi return resp, nil } -// determinePodSandboxIP determines the IP address of the given pod sandbox. -func (m *kubeGenericRuntimeManager) determinePodSandboxIP(podNamespace, podName string, podSandbox *runtimeapi.PodSandboxStatus) string { +// determinePodSandboxIP determines the IP addresses of the given pod sandbox. +func (m *kubeGenericRuntimeManager) determinePodSandboxIPs(podNamespace, podName string, podSandbox *runtimeapi.PodSandboxStatus) []string { + podIPs := make([]string, 0) if podSandbox.Network == nil { - klog.Warningf("Pod Sandbox status doesn't have network information, cannot report IP") - return "" + klog.Warningf("Pod Sandbox status doesn't have network information, cannot report IPs") + return podIPs } - ip := podSandbox.Network.Ip - if len(ip) != 0 && net.ParseIP(ip) == nil { - // ip could be an empty string if runtime is not responsible for the - // IP (e.g., host networking). - klog.Warningf("Pod Sandbox reported an unparseable IP %v", ip) - return "" + + // ip could be an empty string if runtime is not responsible for the + // IP (e.g., host networking). + + // pick primary IP + if len(podSandbox.Network.Ip) != 0 { + if net.ParseIP(podSandbox.Network.Ip) == nil { + klog.Warningf("Pod Sandbox reported an unparseable IP (Primary) %v", podSandbox.Network.Ip) + return nil + } + podIPs = append(podIPs, podSandbox.Network.Ip) } - return ip + + // pick additional ips, if cri reported them + for _, podIP := range podSandbox.Network.AdditionalIps { + if nil == net.ParseIP(podIP.Ip) { + klog.Warningf("Pod Sandbox reported an unparseable IP (additional) %v", podIP.Ip) + return nil + } + podIPs = append(podIPs, podIP.Ip) + } + + return podIPs } // getPodSandboxID gets the sandbox id by podUID and returns ([]sandboxID, error). diff --git a/pkg/kubelet/lifecycle/handlers.go b/pkg/kubelet/lifecycle/handlers.go index 8b5a8b8d406..2b1438eca35 100644 --- a/pkg/kubelet/lifecycle/handlers.go +++ b/pkg/kubelet/lifecycle/handlers.go @@ -108,10 +108,10 @@ func (hr *HandlerRunner) runHTTPHandler(pod *v1.Pod, container *v1.Container, ha klog.Errorf("Unable to get pod info, event handlers may be invalid.") return "", err } - if status.IP == "" { + if len(status.IPs) == 0 { return "", fmt.Errorf("failed to find networking container: %v", status) } - host = status.IP + host = status.IPs[0] } var port int if handler.HTTPGet.Port.Type == intstr.String && len(handler.HTTPGet.Port.StrVal) == 0 { diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index e46a455733b..cc1cec33f98 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -344,22 +344,22 @@ func (g *GenericPLEG) cacheEnabled() bool { return g.cache != nil } -// getPodIP preserves an older cached status' pod IP if the new status has no pod IP +// getPodIP preserves an older cached status' pod IP if the new status has no pod IPs // and its sandboxes have exited -func (g *GenericPLEG) getPodIP(pid types.UID, status *kubecontainer.PodStatus) string { - if status.IP != "" { - return status.IP +func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus) []string { + if len(status.IPs) != 0 { + return status.IPs } oldStatus, err := g.cache.Get(pid) - if err != nil || oldStatus.IP == "" { - return "" + if err != nil || len(oldStatus.IPs) == 0 { + return nil } for _, sandboxStatus := range status.SandboxStatuses { // If at least one sandbox is ready, then use this status update's pod IP if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY { - return status.IP + return status.IPs } } @@ -369,14 +369,14 @@ func (g *GenericPLEG) getPodIP(pid types.UID, status *kubecontainer.PodStatus) s // running then use the new pod IP for _, containerStatus := range status.ContainerStatuses { if containerStatus.State == kubecontainer.ContainerStateCreated || containerStatus.State == kubecontainer.ContainerStateRunning { - return status.IP + return status.IPs } } } // For pods with no ready containers or sandboxes (like exited pods) // use the old status' pod IP - return oldStatus.IP + return oldStatus.IPs } func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error { @@ -398,7 +398,7 @@ func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error { // When a pod is torn down, kubelet may race with PLEG and retrieve // a pod status after network teardown, but the kubernetes API expects // the completed pod's IP to be available after the pod is dead. - status.IP = g.getPodIP(pid, status) + status.IPs = g.getPodIPs(pid, status) } g.cache.Set(pod.ID, status, err, timestamp) diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index 9e648e9c13d..7756882eb4d 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -573,56 +573,73 @@ func TestRelistingWithSandboxes(t *testing.T) { } func TestRelistIPChange(t *testing.T) { - pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock() - ch := pleg.Watch() - - id := types.UID("test-pod-0") - cState := kubecontainer.ContainerStateRunning - container := createTestContainer("c0", cState) - pod := &kubecontainer.Pod{ - ID: id, - Containers: []*kubecontainer.Container{container}, + testCases := []struct { + name string + podID string + podIPs []string + }{ + { + name: "test-0", + podID: "test-pod-0", + podIPs: []string{"192.168.1.5"}, + }, + { + name: "tets-1", + podID: "test-pod-1", + podIPs: []string{"192.168.1.5/24", "2000::"}, + }, } - ipAddr := "192.168.1.5/24" - status := &kubecontainer.PodStatus{ - ID: id, - IP: ipAddr, - ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: cState}}, + for _, tc := range testCases { + pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock() + ch := pleg.Watch() + + id := types.UID(tc.podID) + cState := kubecontainer.ContainerStateRunning + container := createTestContainer("c0", cState) + pod := &kubecontainer.Pod{ + ID: id, + Containers: []*kubecontainer.Container{container}, + } + status := &kubecontainer.PodStatus{ + ID: id, + IPs: tc.podIPs, + ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: cState}}, + } + event := &PodLifecycleEvent{ID: pod.ID, Type: ContainerStarted, Data: container.ID.ID} + + runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once() + runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once() + + pleg.relist() + actualEvents := getEventsFromChannel(ch) + actualStatus, actualErr := pleg.cache.Get(pod.ID) + assert.Equal(t, status, actualStatus, tc.name) + assert.Nil(t, actualErr, tc.name) + assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents) + + // Clear the IP address and mark the container terminated + container = createTestContainer("c0", kubecontainer.ContainerStateExited) + pod = &kubecontainer.Pod{ + ID: id, + Containers: []*kubecontainer.Container{container}, + } + status = &kubecontainer.PodStatus{ + ID: id, + ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: kubecontainer.ContainerStateExited}}, + } + event = &PodLifecycleEvent{ID: pod.ID, Type: ContainerDied, Data: container.ID.ID} + runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once() + runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once() + + pleg.relist() + actualEvents = getEventsFromChannel(ch) + actualStatus, actualErr = pleg.cache.Get(pod.ID) + // Must copy status to compare since its pointer gets passed through all + // the way to the event + statusCopy := *status + statusCopy.IPs = tc.podIPs + assert.Equal(t, &statusCopy, actualStatus, tc.name) + assert.Nil(t, actualErr, tc.name) + assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents) } - event := &PodLifecycleEvent{ID: pod.ID, Type: ContainerStarted, Data: container.ID.ID} - - runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once() - runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once() - - pleg.relist() - actualEvents := getEventsFromChannel(ch) - actualStatus, actualErr := pleg.cache.Get(pod.ID) - assert.Equal(t, status, actualStatus, "test0") - assert.Nil(t, actualErr, "test0") - assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents) - - // Clear the IP address and mark the container terminated - container = createTestContainer("c0", kubecontainer.ContainerStateExited) - pod = &kubecontainer.Pod{ - ID: id, - Containers: []*kubecontainer.Container{container}, - } - status = &kubecontainer.PodStatus{ - ID: id, - ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: kubecontainer.ContainerStateExited}}, - } - event = &PodLifecycleEvent{ID: pod.ID, Type: ContainerDied, Data: container.ID.ID} - runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once() - runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once() - - pleg.relist() - actualEvents = getEventsFromChannel(ch) - actualStatus, actualErr = pleg.cache.Get(pod.ID) - // Must copy status to compare since its pointer gets passed through all - // the way to the event - statusCopy := *status - statusCopy.IP = ipAddr - assert.Equal(t, &statusCopy, actualStatus, "test0") - assert.Nil(t, actualErr, "test0") - assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents) } diff --git a/pkg/util/node/node.go b/pkg/util/node/node.go index c9ae667d790..7997a86a4d4 100644 --- a/pkg/util/node/node.go +++ b/pkg/util/node/node.go @@ -175,7 +175,7 @@ func PatchNodeCIDR(c clientset.Interface, node types.NodeName, cidr string) erro return nil } -// PatchNodeCIDR patches the specified node's CIDR to the given value. +// PatchNodeCIDRs patches the specified node.CIDR=cidrs[0] and node.CIDRs to the given value. func PatchNodeCIDRs(c clientset.Interface, node types.NodeName, cidrs []string) error { rawCidrs, err := json.Marshal(cidrs) if err != nil { @@ -189,7 +189,7 @@ func PatchNodeCIDRs(c clientset.Interface, node types.NodeName, cidrs []string) // set the pod cidrs list and set the old pod cidr field patchBytes := []byte(fmt.Sprintf(`{"spec":{"podCIDR":%s , "podCIDRs":%s}}`, rawCidr, rawCidrs)) - + klog.V(4).Infof("cidrs patch bytes are:%s", string(patchBytes)) if _, err := c.CoreV1().Nodes().Patch(string(node), types.StrategicMergePatchType, patchBytes); err != nil { return fmt.Errorf("failed to patch node CIDR: %v", err) } diff --git a/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go b/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go index afcdfa0879e..4aae5e773af 100644 --- a/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go +++ b/staging/src/k8s.io/cri-api/pkg/apis/testing/fake_runtime_service.go @@ -28,8 +28,8 @@ import ( var ( FakeVersion = "0.1.0" - FakeRuntimeName = "fakeRuntime" - FakePodSandboxIP = "192.168.192.168" + FakeRuntimeName = "fakeRuntime" + FakePodSandboxIPs = []string{"192.168.192.168"} ) type FakePodSandbox struct { @@ -192,7 +192,7 @@ func (r *FakeRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, State: runtimeapi.PodSandboxState_SANDBOX_READY, CreatedAt: createdAt, Network: &runtimeapi.PodSandboxNetworkStatus{ - Ip: FakePodSandboxIP, + Ip: FakePodSandboxIPs[0], }, Labels: config.Labels, Annotations: config.Annotations, @@ -200,7 +200,15 @@ func (r *FakeRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, }, RuntimeHandler: runtimeHandler, } - + // assign additional IPs + additionalIPs := FakePodSandboxIPs[1:] + additionalPodIPs := make([]*runtimeapi.PodIP, 0, len(additionalIPs)) + for _, ip := range additionalIPs { + additionalPodIPs = append(additionalPodIPs, &runtimeapi.PodIP{ + Ip: ip, + }) + } + r.Sandboxes[podSandboxID].PodSandboxStatus.Network.AdditionalIps = additionalPodIPs return podSandboxID, nil }