mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
kubenet for ipv6 dualstack
This commit is contained in:
parent
404dbbcfe0
commit
dba434c4ba
@ -63,6 +63,14 @@ func TestDecodeSinglePod(t *testing.T) {
|
|||||||
SchedulerName: core.DefaultSchedulerName,
|
SchedulerName: core.DefaultSchedulerName,
|
||||||
EnableServiceLinks: &enableServiceLinks,
|
EnableServiceLinks: &enableServiceLinks,
|
||||||
},
|
},
|
||||||
|
Status: v1.PodStatus{
|
||||||
|
PodIP: "1.2.3.4",
|
||||||
|
PodIPs: []v1.PodIP{
|
||||||
|
{
|
||||||
|
IP: "1.2.3.4",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
json, err := runtime.Encode(testapi.Default.Codec(), pod)
|
json, err := runtime.Encode(testapi.Default.Codec(), pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -128,6 +136,14 @@ func TestDecodePodList(t *testing.T) {
|
|||||||
SchedulerName: core.DefaultSchedulerName,
|
SchedulerName: core.DefaultSchedulerName,
|
||||||
EnableServiceLinks: &enableServiceLinks,
|
EnableServiceLinks: &enableServiceLinks,
|
||||||
},
|
},
|
||||||
|
Status: v1.PodStatus{
|
||||||
|
PodIP: "1.2.3.4",
|
||||||
|
PodIPs: []v1.PodIP{
|
||||||
|
{
|
||||||
|
IP: "1.2.3.4",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
podList := &v1.PodList{
|
podList := &v1.PodList{
|
||||||
Items: []v1.Pod{*pod},
|
Items: []v1.Pod{*pod},
|
||||||
|
@ -273,8 +273,8 @@ type PodStatus struct {
|
|||||||
Name string
|
Name string
|
||||||
// Namespace of the pod.
|
// Namespace of the pod.
|
||||||
Namespace string
|
Namespace string
|
||||||
// IP of the pod.
|
// All IPs assigned to this pod
|
||||||
IP string
|
IPs []string
|
||||||
// Status of containers in the pod.
|
// Status of containers in the pod.
|
||||||
ContainerStatuses []*ContainerStatus
|
ContainerStatuses []*ContainerStatus
|
||||||
// Status of the pod sandbox.
|
// Status of the pod sandbox.
|
||||||
|
@ -324,65 +324,75 @@ func (ds *dockerService) RemovePodSandbox(ctx context.Context, r *runtimeapi.Rem
|
|||||||
return nil, utilerrors.NewAggregate(errs)
|
return nil, utilerrors.NewAggregate(errs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getIPFromPlugin interrogates the network plugin for an IP.
|
// getIPsFromPlugin interrogates the network plugin for sandbox IPs.
|
||||||
func (ds *dockerService) getIPFromPlugin(sandbox *dockertypes.ContainerJSON) (string, error) {
|
func (ds *dockerService) getIPsFromPlugin(sandbox *dockertypes.ContainerJSON) ([]string, error) {
|
||||||
metadata, err := parseSandboxName(sandbox.Name)
|
metadata, err := parseSandboxName(sandbox.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return nil, err
|
||||||
}
|
}
|
||||||
msg := fmt.Sprintf("Couldn't find network status for %s/%s through plugin", metadata.Namespace, metadata.Name)
|
msg := fmt.Sprintf("Couldn't find network status for %s/%s through plugin", metadata.Namespace, metadata.Name)
|
||||||
cID := kubecontainer.BuildContainerID(runtimeName, sandbox.ID)
|
cID := kubecontainer.BuildContainerID(runtimeName, sandbox.ID)
|
||||||
networkStatus, err := ds.network.GetPodNetworkStatus(metadata.Namespace, metadata.Name, cID)
|
networkStatus, err := ds.network.GetPodNetworkStatus(metadata.Namespace, metadata.Name, cID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return nil, err
|
||||||
}
|
}
|
||||||
if networkStatus == nil {
|
if networkStatus == nil {
|
||||||
return "", fmt.Errorf("%v: invalid network status for", msg)
|
return nil, fmt.Errorf("%v: invalid network status for", msg)
|
||||||
}
|
}
|
||||||
return networkStatus.IP.String(), nil
|
|
||||||
|
ips := make([]string, 0)
|
||||||
|
for _, ip := range networkStatus.IPs {
|
||||||
|
ips = append(ips, ip.String())
|
||||||
|
}
|
||||||
|
// if we don't have any ip in our list then cni is using classic primary IP only
|
||||||
|
if len(ips) == 0 {
|
||||||
|
ips = append(ips, networkStatus.IP.String())
|
||||||
|
}
|
||||||
|
return ips, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getIP returns the ip given the output of `docker inspect` on a pod sandbox,
|
// getIPs returns the ip given the output of `docker inspect` on a pod sandbox,
|
||||||
// first interrogating any registered plugins, then simply trusting the ip
|
// first interrogating any registered plugins, then simply trusting the ip
|
||||||
// in the sandbox itself. We look for an ipv4 address before ipv6.
|
// in the sandbox itself. We look for an ipv4 address before ipv6.
|
||||||
func (ds *dockerService) getIP(podSandboxID string, sandbox *dockertypes.ContainerJSON) string {
|
func (ds *dockerService) getIPs(podSandboxID string, sandbox *dockertypes.ContainerJSON) []string {
|
||||||
if sandbox.NetworkSettings == nil {
|
if sandbox.NetworkSettings == nil {
|
||||||
return ""
|
return nil
|
||||||
}
|
}
|
||||||
if networkNamespaceMode(sandbox) == runtimeapi.NamespaceMode_NODE {
|
if networkNamespaceMode(sandbox) == runtimeapi.NamespaceMode_NODE {
|
||||||
// For sandboxes using host network, the shim is not responsible for
|
// For sandboxes using host network, the shim is not responsible for
|
||||||
// reporting the IP.
|
// reporting the IP.
|
||||||
return ""
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't bother getting IP if the pod is known and networking isn't ready
|
// Don't bother getting IP if the pod is known and networking isn't ready
|
||||||
ready, ok := ds.getNetworkReady(podSandboxID)
|
ready, ok := ds.getNetworkReady(podSandboxID)
|
||||||
if ok && !ready {
|
if ok && !ready {
|
||||||
return ""
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ip, err := ds.getIPFromPlugin(sandbox)
|
ips, err := ds.getIPsFromPlugin(sandbox)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return ip
|
return ips
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ips = make([]string, 0)
|
||||||
// TODO: trusting the docker ip is not a great idea. However docker uses
|
// TODO: trusting the docker ip is not a great idea. However docker uses
|
||||||
// eth0 by default and so does CNI, so if we find a docker IP here, we
|
// eth0 by default and so does CNI, so if we find a docker IP here, we
|
||||||
// conclude that the plugin must have failed setup, or forgotten its ip.
|
// conclude that the plugin must have failed setup, or forgotten its ip.
|
||||||
// This is not a sensible assumption for plugins across the board, but if
|
// This is not a sensible assumption for plugins across the board, but if
|
||||||
// a plugin doesn't want this behavior, it can throw an error.
|
// a plugin doesn't want this behavior, it can throw an error.
|
||||||
if sandbox.NetworkSettings.IPAddress != "" {
|
if sandbox.NetworkSettings.IPAddress != "" {
|
||||||
return sandbox.NetworkSettings.IPAddress
|
ips = append(ips, sandbox.NetworkSettings.IPAddress)
|
||||||
}
|
}
|
||||||
if sandbox.NetworkSettings.GlobalIPv6Address != "" {
|
if sandbox.NetworkSettings.GlobalIPv6Address != "" {
|
||||||
return sandbox.NetworkSettings.GlobalIPv6Address
|
ips = append(ips, sandbox.NetworkSettings.GlobalIPv6Address)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If all else fails, warn but don't return an error, as pod status
|
// If all else fails, warn but don't return an error, as pod status
|
||||||
// should generally not return anything except fatal errors
|
// should generally not return anything except fatal errors
|
||||||
// FIXME: handle network errors by restarting the pod somehow?
|
// FIXME: handle network errors by restarting the pod somehow?
|
||||||
klog.Warningf("failed to read pod IP from plugin/docker: %v", err)
|
klog.Warningf("failed to read pod IP from plugin/docker: %v", err)
|
||||||
return ""
|
return ips
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the inspect container response, the sandbox metadata, and network namespace mode
|
// Returns the inspect container response, the sandbox metadata, and network namespace mode
|
||||||
@ -422,11 +432,19 @@ func (ds *dockerService) PodSandboxStatus(ctx context.Context, req *runtimeapi.P
|
|||||||
state = runtimeapi.PodSandboxState_SANDBOX_READY
|
state = runtimeapi.PodSandboxState_SANDBOX_READY
|
||||||
}
|
}
|
||||||
|
|
||||||
var IP string
|
var ips []string
|
||||||
// TODO: Remove this when sandbox is available on windows
|
// TODO: Remove this when sandbox is available on windows
|
||||||
// This is a workaround for windows, where sandbox is not in use, and pod IP is determined through containers belonging to the Pod.
|
// This is a workaround for windows, where sandbox is not in use, and pod IP is determined through containers belonging to the Pod.
|
||||||
if IP = ds.determinePodIPBySandboxID(podSandboxID); IP == "" {
|
if ips = ds.determinePodIPBySandboxID(podSandboxID); len(ips) == 0 {
|
||||||
IP = ds.getIP(podSandboxID, r)
|
ips = ds.getIPs(podSandboxID, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ip is primary ips
|
||||||
|
// ips is all other ips
|
||||||
|
ip := ""
|
||||||
|
if len(ips) != 0 {
|
||||||
|
ip = ips[0]
|
||||||
|
ips = ips[1:]
|
||||||
}
|
}
|
||||||
|
|
||||||
labels, annotations := extractLabels(r.Config.Labels)
|
labels, annotations := extractLabels(r.Config.Labels)
|
||||||
@ -438,7 +456,7 @@ func (ds *dockerService) PodSandboxStatus(ctx context.Context, req *runtimeapi.P
|
|||||||
Labels: labels,
|
Labels: labels,
|
||||||
Annotations: annotations,
|
Annotations: annotations,
|
||||||
Network: &runtimeapi.PodSandboxNetworkStatus{
|
Network: &runtimeapi.PodSandboxNetworkStatus{
|
||||||
Ip: IP,
|
Ip: ip,
|
||||||
},
|
},
|
||||||
Linux: &runtimeapi.LinuxPodSandboxStatus{
|
Linux: &runtimeapi.LinuxPodSandboxStatus{
|
||||||
Namespaces: &runtimeapi.Namespace{
|
Namespaces: &runtimeapi.Namespace{
|
||||||
@ -450,6 +468,14 @@ func (ds *dockerService) PodSandboxStatus(ctx context.Context, req *runtimeapi.P
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
// add additional IPs
|
||||||
|
additionalPodIPs := make([]*runtimeapi.PodIP, 0, len(ips))
|
||||||
|
for _, ip := range ips {
|
||||||
|
additionalPodIPs = append(additionalPodIPs, &runtimeapi.PodIP{
|
||||||
|
Ip: ip,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
status.Network.AdditionalIps = additionalPodIPs
|
||||||
return &runtimeapi.PodSandboxStatusResponse{Status: status}, nil
|
return &runtimeapi.PodSandboxStatusResponse{Status: status}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,7 +106,7 @@ func TestSandboxStatus(t *testing.T) {
|
|||||||
State: state,
|
State: state,
|
||||||
CreatedAt: ct,
|
CreatedAt: ct,
|
||||||
Metadata: config.Metadata,
|
Metadata: config.Metadata,
|
||||||
Network: &runtimeapi.PodSandboxNetworkStatus{Ip: podIP},
|
Network: &runtimeapi.PodSandboxNetworkStatus{Ip: podIP, AdditionalIps: []*runtimeapi.PodIP{}},
|
||||||
Linux: &runtimeapi.LinuxPodSandboxStatus{
|
Linux: &runtimeapi.LinuxPodSandboxStatus{
|
||||||
Namespaces: &runtimeapi.Namespace{
|
Namespaces: &runtimeapi.Namespace{
|
||||||
Options: &runtimeapi.NamespaceOption{
|
Options: &runtimeapi.NamespaceOption{
|
||||||
@ -142,6 +142,7 @@ func TestSandboxStatus(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// IP not valid after sandbox stop
|
// IP not valid after sandbox stop
|
||||||
expected.Network.Ip = ""
|
expected.Network.Ip = ""
|
||||||
|
expected.Network.AdditionalIps = []*runtimeapi.PodIP{}
|
||||||
statusResp, err = ds.PodSandboxStatus(getTestCTX(), &runtimeapi.PodSandboxStatusRequest{PodSandboxId: id})
|
statusResp, err = ds.PodSandboxStatus(getTestCTX(), &runtimeapi.PodSandboxStatusRequest{PodSandboxId: id})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, expected, statusResp.Status)
|
assert.Equal(t, expected, statusResp.Status)
|
||||||
@ -161,14 +162,13 @@ func TestSandboxStatusAfterRestart(t *testing.T) {
|
|||||||
config := makeSandboxConfig("foo", "bar", "1", 0)
|
config := makeSandboxConfig("foo", "bar", "1", 0)
|
||||||
r := rand.New(rand.NewSource(0)).Uint32()
|
r := rand.New(rand.NewSource(0)).Uint32()
|
||||||
podIP := fmt.Sprintf("10.%d.%d.%d", byte(r>>16), byte(r>>8), byte(r))
|
podIP := fmt.Sprintf("10.%d.%d.%d", byte(r>>16), byte(r>>8), byte(r))
|
||||||
|
|
||||||
state := runtimeapi.PodSandboxState_SANDBOX_READY
|
state := runtimeapi.PodSandboxState_SANDBOX_READY
|
||||||
ct := int64(0)
|
ct := int64(0)
|
||||||
expected := &runtimeapi.PodSandboxStatus{
|
expected := &runtimeapi.PodSandboxStatus{
|
||||||
State: state,
|
State: state,
|
||||||
CreatedAt: ct,
|
CreatedAt: ct,
|
||||||
Metadata: config.Metadata,
|
Metadata: config.Metadata,
|
||||||
Network: &runtimeapi.PodSandboxNetworkStatus{Ip: podIP},
|
Network: &runtimeapi.PodSandboxNetworkStatus{Ip: podIP, AdditionalIps: []*runtimeapi.PodIP{}},
|
||||||
Linux: &runtimeapi.LinuxPodSandboxStatus{
|
Linux: &runtimeapi.LinuxPodSandboxStatus{
|
||||||
Namespaces: &runtimeapi.Namespace{
|
Namespaces: &runtimeapi.Namespace{
|
||||||
Options: &runtimeapi.NamespaceOption{
|
Options: &runtimeapi.NamespaceOption{
|
||||||
|
@ -136,8 +136,8 @@ func (ds *dockerService) updateCreateConfig(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *dockerService) determinePodIPBySandboxID(uid string) string {
|
func (ds *dockerService) determinePodIPBySandboxID(uid string) []string {
|
||||||
return ""
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getNetworkNamespace(c *dockertypes.ContainerJSON) (string, error) {
|
func getNetworkNamespace(c *dockertypes.ContainerJSON) (string, error) {
|
||||||
|
@ -45,9 +45,9 @@ func (ds *dockerService) updateCreateConfig(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *dockerService) determinePodIPBySandboxID(uid string) string {
|
func (ds *dockerService) determinePodIPBySandboxID(uid string) []string {
|
||||||
klog.Warningf("determinePodIPBySandboxID is unsupported in this build")
|
klog.Warningf("determinePodIPBySandboxID is unsupported in this build")
|
||||||
return ""
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getNetworkNamespace(c *dockertypes.ContainerJSON) (string, error) {
|
func getNetworkNamespace(c *dockertypes.ContainerJSON) (string, error) {
|
||||||
|
@ -97,7 +97,7 @@ func applyWindowsContainerSecurityContext(wsc *runtimeapi.WindowsContainerSecuri
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *dockerService) determinePodIPBySandboxID(sandboxID string) string {
|
func (ds *dockerService) determinePodIPBySandboxID(sandboxID string) []string {
|
||||||
opts := dockertypes.ContainerListOptions{
|
opts := dockertypes.ContainerListOptions{
|
||||||
All: true,
|
All: true,
|
||||||
Filters: dockerfilters.NewArgs(),
|
Filters: dockerfilters.NewArgs(),
|
||||||
@ -108,7 +108,7 @@ func (ds *dockerService) determinePodIPBySandboxID(sandboxID string) string {
|
|||||||
f.AddLabel(sandboxIDLabelKey, sandboxID)
|
f.AddLabel(sandboxIDLabelKey, sandboxID)
|
||||||
containers, err := ds.client.ListContainers(opts)
|
containers, err := ds.client.ListContainers(opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ""
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range containers {
|
for _, c := range containers {
|
||||||
@ -144,8 +144,8 @@ func (ds *dockerService) determinePodIPBySandboxID(sandboxID string) string {
|
|||||||
// Hyper-V only supports one container per Pod yet and the container will have a different
|
// Hyper-V only supports one container per Pod yet and the container will have a different
|
||||||
// IP address from sandbox. Return the first non-sandbox container IP as POD IP.
|
// IP address from sandbox. Return the first non-sandbox container IP as POD IP.
|
||||||
// TODO(feiskyer): remove this workaround after Hyper-V supports multiple containers per Pod.
|
// TODO(feiskyer): remove this workaround after Hyper-V supports multiple containers per Pod.
|
||||||
if containerIP := ds.getIP(c.ID, r); containerIP != "" {
|
if containerIPs := ds.getIPs(c.ID, r); len(containerIPs) != 0 {
|
||||||
return containerIP
|
return containerIPs
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Do not return any IP, so that we would continue and get the IP of the Sandbox.
|
// Do not return any IP, so that we would continue and get the IP of the Sandbox.
|
||||||
@ -153,17 +153,17 @@ func (ds *dockerService) determinePodIPBySandboxID(sandboxID string) string {
|
|||||||
// to replicate the DNS registry key to the Workload container (IP/Gateway/MAC is
|
// to replicate the DNS registry key to the Workload container (IP/Gateway/MAC is
|
||||||
// set separately than DNS).
|
// set separately than DNS).
|
||||||
// TODO(feiskyer): remove this workaround after Namespace is supported in Windows RS5.
|
// TODO(feiskyer): remove this workaround after Namespace is supported in Windows RS5.
|
||||||
ds.getIP(sandboxID, r)
|
ds.getIPs(sandboxID, r)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// ds.getIP will call the CNI plugin to fetch the IP
|
// ds.getIP will call the CNI plugin to fetch the IP
|
||||||
if containerIP := ds.getIP(c.ID, r); containerIP != "" {
|
if containerIPs := ds.getIPs(c.ID, r); len(containerIPs) != 0 {
|
||||||
return containerIP
|
return containerIPs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ""
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getNetworkNamespace(c *dockertypes.ContainerJSON) (string, error) {
|
func getNetworkNamespace(c *dockertypes.ContainerJSON) (string, error) {
|
||||||
|
@ -9,6 +9,7 @@ go_library(
|
|||||||
importpath = "k8s.io/kubernetes/pkg/kubelet/dockershim/network",
|
importpath = "k8s.io/kubernetes/pkg/kubelet/dockershim/network",
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
deps = [
|
deps = [
|
||||||
|
"//pkg/features:go_default_library",
|
||||||
"//pkg/kubelet/apis/config:go_default_library",
|
"//pkg/kubelet/apis/config:go_default_library",
|
||||||
"//pkg/kubelet/container:go_default_library",
|
"//pkg/kubelet/container:go_default_library",
|
||||||
"//pkg/kubelet/dockershim/network/hostport:go_default_library",
|
"//pkg/kubelet/dockershim/network/hostport:go_default_library",
|
||||||
@ -18,6 +19,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||||
],
|
],
|
||||||
|
@ -69,12 +69,15 @@ func (plugin *cniNetworkPlugin) GetPodNetworkStatus(namespace string, name strin
|
|||||||
return nil, fmt.Errorf("Cannot find the network namespace, skipping pod network status for container %q", id)
|
return nil, fmt.Errorf("Cannot find the network namespace, skipping pod network status for container %q", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
ip, err := network.GetPodIP(plugin.execer, plugin.nsenterPath, netnsPath, network.DefaultInterfaceName)
|
ips, err := network.GetPodIPs(plugin.execer, plugin.nsenterPath, netnsPath, network.DefaultInterfaceName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &network.PodNetworkStatus{IP: ip}, nil
|
return &network.PodNetworkStatus{
|
||||||
|
IP: ips[0],
|
||||||
|
IPs: ips,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildDNSCapabilities builds cniDNSConfig from runtimeapi.DNSConfig.
|
// buildDNSCapabilities builds cniDNSConfig from runtimeapi.DNSConfig.
|
||||||
|
@ -36,6 +36,7 @@ go_library(
|
|||||||
"//pkg/kubelet/dockershim/network:go_default_library",
|
"//pkg/kubelet/dockershim/network:go_default_library",
|
||||||
],
|
],
|
||||||
"@io_bazel_rules_go//go/platform:linux": [
|
"@io_bazel_rules_go//go/platform:linux": [
|
||||||
|
"//pkg/features:go_default_library",
|
||||||
"//pkg/kubelet/apis/config:go_default_library",
|
"//pkg/kubelet/apis/config:go_default_library",
|
||||||
"//pkg/kubelet/container:go_default_library",
|
"//pkg/kubelet/container:go_default_library",
|
||||||
"//pkg/kubelet/dockershim/network:go_default_library",
|
"//pkg/kubelet/dockershim/network:go_default_library",
|
||||||
@ -48,6 +49,7 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//vendor/github.com/containernetworking/cni/libcni:go_default_library",
|
"//vendor/github.com/containernetworking/cni/libcni:go_default_library",
|
||||||
"//vendor/github.com/containernetworking/cni/pkg/types:go_default_library",
|
"//vendor/github.com/containernetworking/cni/pkg/types:go_default_library",
|
||||||
"//vendor/github.com/containernetworking/cni/pkg/types/020:go_default_library",
|
"//vendor/github.com/containernetworking/cni/pkg/types/020:go_default_library",
|
||||||
@ -55,6 +57,7 @@ go_library(
|
|||||||
"//vendor/golang.org/x/sys/unix:go_default_library",
|
"//vendor/golang.org/x/sys/unix:go_default_library",
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||||
|
"//vendor/k8s.io/utils/net:go_default_library",
|
||||||
],
|
],
|
||||||
"@io_bazel_rules_go//go/platform:nacl": [
|
"@io_bazel_rules_go//go/platform:nacl": [
|
||||||
"//pkg/kubelet/apis/config:go_default_library",
|
"//pkg/kubelet/apis/config:go_default_library",
|
||||||
@ -105,6 +108,7 @@ go_test(
|
|||||||
"//pkg/util/bandwidth:go_default_library",
|
"//pkg/util/bandwidth:go_default_library",
|
||||||
"//pkg/util/iptables/testing:go_default_library",
|
"//pkg/util/iptables/testing:go_default_library",
|
||||||
"//pkg/util/sysctl/testing:go_default_library",
|
"//pkg/util/sysctl/testing:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/mock:go_default_library",
|
"//vendor/github.com/stretchr/testify/mock:go_default_library",
|
||||||
"//vendor/k8s.io/utils/exec:go_default_library",
|
"//vendor/k8s.io/utils/exec:go_default_library",
|
||||||
|
@ -46,6 +46,10 @@ import (
|
|||||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||||
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
||||||
utilexec "k8s.io/utils/exec"
|
utilexec "k8s.io/utils/exec"
|
||||||
|
|
||||||
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
|
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||||
|
netutils "k8s.io/utils/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -63,6 +67,29 @@ const (
|
|||||||
// defaultIPAMDir is the default location for the checkpoint files stored by host-local ipam
|
// defaultIPAMDir is the default location for the checkpoint files stored by host-local ipam
|
||||||
// https://github.com/containernetworking/cni/tree/master/plugins/ipam/host-local#backends
|
// https://github.com/containernetworking/cni/tree/master/plugins/ipam/host-local#backends
|
||||||
defaultIPAMDir = "/var/lib/cni/networks"
|
defaultIPAMDir = "/var/lib/cni/networks"
|
||||||
|
|
||||||
|
zeroCIDRv6 = "::/0"
|
||||||
|
zeroCIDRv4 = "0.0.0.0/0"
|
||||||
|
|
||||||
|
NET_CONFIG_TEMPLATE = `{
|
||||||
|
"cniVersion": "0.1.0",
|
||||||
|
"name": "kubenet",
|
||||||
|
"type": "bridge",
|
||||||
|
"bridge": "%s",
|
||||||
|
"mtu": %d,
|
||||||
|
"addIf": "%s",
|
||||||
|
"isGateway": true,
|
||||||
|
"ipMasq": false,
|
||||||
|
"hairpinMode": %t,
|
||||||
|
"ipam": {
|
||||||
|
"type": "host-local",
|
||||||
|
"ranges": [%s],
|
||||||
|
"routes": [
|
||||||
|
{ "dst": "%s" },
|
||||||
|
{ "dst": "%s" }
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}`
|
||||||
)
|
)
|
||||||
|
|
||||||
// CNI plugins required by kubenet in /opt/cni/bin or user-specified directory
|
// CNI plugins required by kubenet in /opt/cni/bin or user-specified directory
|
||||||
@ -77,7 +104,7 @@ type kubenetNetworkPlugin struct {
|
|||||||
cniConfig libcni.CNI
|
cniConfig libcni.CNI
|
||||||
bandwidthShaper bandwidth.Shaper
|
bandwidthShaper bandwidth.Shaper
|
||||||
mu sync.Mutex //Mutex for protecting podIPs map, netConfig, and shaper initialization
|
mu sync.Mutex //Mutex for protecting podIPs map, netConfig, and shaper initialization
|
||||||
podIPs map[kubecontainer.ContainerID]string
|
podIPs map[kubecontainer.ContainerID]utilsets.String
|
||||||
mtu int
|
mtu int
|
||||||
execer utilexec.Interface
|
execer utilexec.Interface
|
||||||
nsenterPath string
|
nsenterPath string
|
||||||
@ -88,33 +115,36 @@ type kubenetNetworkPlugin struct {
|
|||||||
hostportSyncer hostport.HostportSyncer
|
hostportSyncer hostport.HostportSyncer
|
||||||
hostportManager hostport.HostPortManager
|
hostportManager hostport.HostPortManager
|
||||||
iptables utiliptables.Interface
|
iptables utiliptables.Interface
|
||||||
|
iptablesv6 utiliptables.Interface
|
||||||
sysctl utilsysctl.Interface
|
sysctl utilsysctl.Interface
|
||||||
ebtables utilebtables.Interface
|
ebtables utilebtables.Interface
|
||||||
// binDirs is passed by kubelet cni-bin-dir parameter.
|
// binDirs is passed by kubelet cni-bin-dir parameter.
|
||||||
// kubenet will search for CNI binaries in DefaultCNIDir first, then continue to binDirs.
|
// kubenet will search for CNI binaries in DefaultCNIDir first, then continue to binDirs.
|
||||||
binDirs []string
|
binDirs []string
|
||||||
nonMasqueradeCIDR string
|
nonMasqueradeCIDR string
|
||||||
podCidr string
|
|
||||||
gateway net.IP
|
|
||||||
cacheDir string
|
cacheDir string
|
||||||
|
podCIDRs []*net.IPNet
|
||||||
|
podGateways []net.IP
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPlugin(networkPluginDirs []string, cacheDir string) network.NetworkPlugin {
|
func NewPlugin(networkPluginDirs []string, cacheDir string) network.NetworkPlugin {
|
||||||
protocol := utiliptables.ProtocolIpv4
|
|
||||||
execer := utilexec.New()
|
execer := utilexec.New()
|
||||||
dbus := utildbus.New()
|
dbus := utildbus.New()
|
||||||
sysctl := utilsysctl.New()
|
iptInterface := utiliptables.New(execer, dbus, utiliptables.ProtocolIpv4)
|
||||||
iptInterface := utiliptables.New(execer, dbus, protocol)
|
iptInterfacev6 := utiliptables.New(execer, dbus, utiliptables.ProtocolIpv6)
|
||||||
return &kubenetNetworkPlugin{
|
return &kubenetNetworkPlugin{
|
||||||
podIPs: make(map[kubecontainer.ContainerID]string),
|
podIPs: make(map[kubecontainer.ContainerID]utilsets.String),
|
||||||
execer: utilexec.New(),
|
execer: utilexec.New(),
|
||||||
iptables: iptInterface,
|
iptables: iptInterface,
|
||||||
sysctl: sysctl,
|
iptablesv6: iptInterfacev6,
|
||||||
|
sysctl: utilsysctl.New(),
|
||||||
binDirs: append([]string{DefaultCNIDir}, networkPluginDirs...),
|
binDirs: append([]string{DefaultCNIDir}, networkPluginDirs...),
|
||||||
hostportSyncer: hostport.NewHostportSyncer(iptInterface),
|
hostportSyncer: hostport.NewHostportSyncer(iptInterface),
|
||||||
hostportManager: hostport.NewHostportManager(iptInterface),
|
hostportManager: hostport.NewHostportManager(iptInterface),
|
||||||
nonMasqueradeCIDR: "10.0.0.0/8",
|
nonMasqueradeCIDR: "10.0.0.0/8",
|
||||||
cacheDir: cacheDir,
|
cacheDir: cacheDir,
|
||||||
|
podCIDRs: make([]*net.IPNet, 0),
|
||||||
|
podGateways: make([]net.IP, 0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -171,8 +201,14 @@ func (plugin *kubenetNetworkPlugin) Init(host network.Host, hairpinMode kubeletc
|
|||||||
|
|
||||||
// TODO: move thic logic into cni bridge plugin and remove this from kubenet
|
// TODO: move thic logic into cni bridge plugin and remove this from kubenet
|
||||||
func (plugin *kubenetNetworkPlugin) ensureMasqRule() error {
|
func (plugin *kubenetNetworkPlugin) ensureMasqRule() error {
|
||||||
if plugin.nonMasqueradeCIDR != "0.0.0.0/0" {
|
if plugin.nonMasqueradeCIDR != zeroCIDRv4 && plugin.nonMasqueradeCIDR != zeroCIDRv6 {
|
||||||
if _, err := plugin.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting,
|
// switch according to target nonMasqueradeCidr ip family
|
||||||
|
ipt := plugin.iptables
|
||||||
|
if netutils.IsIPv6CIDRString(plugin.nonMasqueradeCIDR) {
|
||||||
|
ipt = plugin.iptablesv6
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := ipt.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting,
|
||||||
"-m", "comment", "--comment", "kubenet: SNAT for outbound traffic from cluster",
|
"-m", "comment", "--comment", "kubenet: SNAT for outbound traffic from cluster",
|
||||||
"-m", "addrtype", "!", "--dst-type", "LOCAL",
|
"-m", "addrtype", "!", "--dst-type", "LOCAL",
|
||||||
"!", "-d", plugin.nonMasqueradeCIDR,
|
"!", "-d", plugin.nonMasqueradeCIDR,
|
||||||
@ -207,27 +243,8 @@ func findMinMTU() (*net.Interface, error) {
|
|||||||
return &intfs[defIntfIndex], nil
|
return &intfs[defIntfIndex], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
const NET_CONFIG_TEMPLATE = `{
|
|
||||||
"cniVersion": "0.1.0",
|
|
||||||
"name": "kubenet",
|
|
||||||
"type": "bridge",
|
|
||||||
"bridge": "%s",
|
|
||||||
"mtu": %d,
|
|
||||||
"addIf": "%s",
|
|
||||||
"isGateway": true,
|
|
||||||
"ipMasq": false,
|
|
||||||
"hairpinMode": %t,
|
|
||||||
"ipam": {
|
|
||||||
"type": "host-local",
|
|
||||||
"subnet": "%s",
|
|
||||||
"gateway": "%s",
|
|
||||||
"routes": [
|
|
||||||
{ "dst": "0.0.0.0/0" }
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}`
|
|
||||||
|
|
||||||
func (plugin *kubenetNetworkPlugin) Event(name string, details map[string]interface{}) {
|
func (plugin *kubenetNetworkPlugin) Event(name string, details map[string]interface{}) {
|
||||||
|
var err error
|
||||||
if name != network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE {
|
if name != network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -246,33 +263,54 @@ func (plugin *kubenetNetworkPlugin) Event(name string, details map[string]interf
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(5).Infof("PodCIDR is set to %q", podCIDR)
|
klog.V(4).Infof("kubenet: PodCIDR is set to %q", podCIDR)
|
||||||
_, cidr, err := net.ParseCIDR(podCIDR)
|
podCIDRs := strings.Split(podCIDR, ",")
|
||||||
if err == nil {
|
|
||||||
setHairpin := plugin.hairpinMode == kubeletconfig.HairpinVeth
|
|
||||||
// Set bridge address to first address in IPNet
|
|
||||||
cidr.IP[len(cidr.IP)-1] += 1
|
|
||||||
|
|
||||||
json := fmt.Sprintf(NET_CONFIG_TEMPLATE, BridgeName, plugin.mtu, network.DefaultInterfaceName, setHairpin, podCIDR, cidr.IP.String())
|
// reset to one cidr if dual stack is not enabled
|
||||||
klog.V(2).Infof("CNI network config set to %v", json)
|
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.IPv6DualStack) && len(podCIDRs) > 1 {
|
||||||
plugin.netConfig, err = libcni.ConfFromBytes([]byte(json))
|
klog.V(2).Infof("This node has multiple pod cidrs assigned and dual stack is not enabled. ignoring all except first cidr")
|
||||||
if err == nil {
|
podCIDRs = podCIDRs[0:1]
|
||||||
klog.V(5).Infof("CNI network config:\n%s", json)
|
|
||||||
|
|
||||||
// Ensure cbr0 has no conflicting addresses; CNI's 'bridge'
|
|
||||||
// plugin will bail out if the bridge has an unexpected one
|
|
||||||
plugin.clearBridgeAddressesExcept(cidr)
|
|
||||||
}
|
|
||||||
plugin.podCidr = podCIDR
|
|
||||||
plugin.gateway = cidr.IP
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for idx, currentPodCIDR := range podCIDRs {
|
||||||
|
_, cidr, err := net.ParseCIDR(currentPodCIDR)
|
||||||
|
if nil != err {
|
||||||
|
klog.Warningf("Failed to generate CNI network config with cidr %s at indx:%v: %v", currentPodCIDR, idx, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// create list of ips and gateways
|
||||||
|
cidr.IP[len(cidr.IP)-1] += 1 // Set bridge address to first address in IPNet
|
||||||
|
plugin.podCIDRs = append(plugin.podCIDRs, cidr)
|
||||||
|
plugin.podGateways = append(plugin.podGateways, cidr.IP)
|
||||||
|
}
|
||||||
|
|
||||||
|
//setup hairpinMode
|
||||||
|
setHairpin := plugin.hairpinMode == kubeletconfig.HairpinVeth
|
||||||
|
|
||||||
|
json := fmt.Sprintf(NET_CONFIG_TEMPLATE, BridgeName, plugin.mtu, network.DefaultInterfaceName, setHairpin, plugin.getRangesConfig(), zeroCIDRv4, zeroCIDRv6)
|
||||||
|
klog.V(4).Infof("CNI network config set to %v", json)
|
||||||
|
plugin.netConfig, err = libcni.ConfFromBytes([]byte(json))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Failed to generate CNI network config: %v", err)
|
klog.Warningf("** failed to set up CNI with %v err:%v", json, err)
|
||||||
|
// just incase it was set by mistake
|
||||||
|
plugin.netConfig = nil
|
||||||
|
// we bail out by clearing the *entire* list
|
||||||
|
// of addresses assigned to cbr0
|
||||||
|
plugin.clearUnusedBridgeAddresses()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *kubenetNetworkPlugin) clearBridgeAddressesExcept(keep *net.IPNet) {
|
// clear all address on bridge except those operated on by kubenet
|
||||||
|
func (plugin *kubenetNetworkPlugin) clearUnusedBridgeAddresses() {
|
||||||
|
cidrIncluded := func(list []*net.IPNet, check *net.IPNet) bool {
|
||||||
|
for _, thisNet := range list {
|
||||||
|
if utilnet.IPNetEqual(thisNet, check) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
bridge, err := netlink.LinkByName(BridgeName)
|
bridge, err := netlink.LinkByName(BridgeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -280,11 +318,12 @@ func (plugin *kubenetNetworkPlugin) clearBridgeAddressesExcept(keep *net.IPNet)
|
|||||||
|
|
||||||
addrs, err := netlink.AddrList(bridge, unix.AF_INET)
|
addrs, err := netlink.AddrList(bridge, unix.AF_INET)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
klog.V(2).Infof("attempting to get address for interface: %s failed with err:%v", BridgeName, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, addr := range addrs {
|
for _, addr := range addrs {
|
||||||
if !utilnet.IPNetEqual(addr.IPNet, keep) {
|
if !cidrIncluded(plugin.podCIDRs, addr.IPNet) {
|
||||||
klog.V(2).Infof("Removing old address %s from %s", addr.IPNet.String(), BridgeName)
|
klog.V(2).Infof("Removing old address %s from %s", addr.IPNet.String(), BridgeName)
|
||||||
netlink.AddrDel(bridge, &addr)
|
netlink.AddrDel(bridge, &addr)
|
||||||
}
|
}
|
||||||
@ -301,6 +340,7 @@ func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int {
|
|||||||
|
|
||||||
// setup sets up networking through CNI using the given ns/name and sandbox ID.
|
// setup sets up networking through CNI using the given ns/name and sandbox ID.
|
||||||
func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
|
func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
|
||||||
|
var ipv4, ipv6 net.IP
|
||||||
// Disable DAD so we skip the kernel delay on bringing up new interfaces.
|
// Disable DAD so we skip the kernel delay on bringing up new interfaces.
|
||||||
if err := plugin.disableContainerDAD(id); err != nil {
|
if err := plugin.disableContainerDAD(id); err != nil {
|
||||||
klog.V(3).Infof("Failed to disable DAD in container: %v", err)
|
klog.V(3).Infof("Failed to disable DAD in container: %v", err)
|
||||||
@ -321,14 +361,19 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to understand network config: %v", err)
|
return fmt.Errorf("unable to understand network config: %v", err)
|
||||||
}
|
}
|
||||||
if res.IP4 == nil {
|
//TODO: v1.16 (khenidak) update NET_CONFIG_TEMPLATE to CNI version 0.3.0 or later so
|
||||||
return fmt.Errorf("CNI plugin reported no IPv4 address for container %v.", id)
|
// that we get multiple IP addresses in the returned Result structure
|
||||||
}
|
if res.IP4 != nil {
|
||||||
ip4 := res.IP4.IP.IP.To4()
|
ipv4 = res.IP4.IP.IP.To4()
|
||||||
if ip4 == nil {
|
|
||||||
return fmt.Errorf("CNI plugin reported an invalid IPv4 address for container %v: %+v.", id, res.IP4)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if res.IP6 != nil {
|
||||||
|
ipv6 = res.IP6.IP.IP
|
||||||
|
}
|
||||||
|
|
||||||
|
if ipv4 == nil && ipv6 == nil {
|
||||||
|
return fmt.Errorf("cni didn't report ipv4 ipv6")
|
||||||
|
}
|
||||||
// Put the container bridge into promiscuous mode to force it to accept hairpin packets.
|
// Put the container bridge into promiscuous mode to force it to accept hairpin packets.
|
||||||
// TODO: Remove this once the kernel bug (#20096) is fixed.
|
// TODO: Remove this once the kernel bug (#20096) is fixed.
|
||||||
if plugin.hairpinMode == kubeletconfig.PromiscuousBridge {
|
if plugin.hairpinMode == kubeletconfig.PromiscuousBridge {
|
||||||
@ -348,57 +393,97 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
|
|||||||
plugin.syncEbtablesDedupRules(link.Attrs().HardwareAddr)
|
plugin.syncEbtablesDedupRules(link.Attrs().HardwareAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
plugin.podIPs[id] = ip4.String()
|
// add the ip to tracked ips
|
||||||
|
if ipv4 != nil {
|
||||||
|
plugin.addPodIP(id, ipv4.String())
|
||||||
|
}
|
||||||
|
if ipv6 != nil {
|
||||||
|
plugin.addPodIP(id, ipv6.String())
|
||||||
|
}
|
||||||
|
|
||||||
// The first SetUpPod call creates the bridge; get a shaper for the sake of initialization
|
if err := plugin.addTrafficShaping(id, annotations); err != nil {
|
||||||
// TODO: replace with CNI traffic shaper plugin
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return plugin.addPortMapping(id, name, namespace)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The first SetUpPod call creates the bridge; get a shaper for the sake of initialization
|
||||||
|
// TODO: replace with CNI traffic shaper plugin
|
||||||
|
func (plugin *kubenetNetworkPlugin) addTrafficShaping(id kubecontainer.ContainerID, annotations map[string]string) error {
|
||||||
shaper := plugin.shaper()
|
shaper := plugin.shaper()
|
||||||
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(annotations)
|
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(annotations)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error reading pod bandwidth annotations: %v", err)
|
return fmt.Errorf("Error reading pod bandwidth annotations: %v", err)
|
||||||
}
|
}
|
||||||
if egress != nil || ingress != nil {
|
iplist, exists := plugin.getCachedPodIPs(id)
|
||||||
if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil {
|
if !exists {
|
||||||
return fmt.Errorf("Failed to add pod to shaper: %v", err)
|
return fmt.Errorf("pod %s does not have recorded ips", id)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: replace with CNI port-forwarding plugin
|
if egress != nil || ingress != nil {
|
||||||
portMappings, err := plugin.host.GetPodPortMappings(id.ID)
|
for _, ip := range iplist {
|
||||||
if err != nil {
|
mask := 32
|
||||||
return err
|
if netutils.IsIPv6String(ip) {
|
||||||
}
|
mask = 128
|
||||||
if portMappings != nil && len(portMappings) > 0 {
|
}
|
||||||
if err := plugin.hostportManager.Add(id.ID, &hostport.PodPortMapping{
|
if err != nil {
|
||||||
Namespace: namespace,
|
return fmt.Errorf("failed to setup traffic shaping for pod ip%s", ip)
|
||||||
Name: name,
|
}
|
||||||
PortMappings: portMappings,
|
|
||||||
IP: ip4,
|
if err := shaper.ReconcileCIDR(fmt.Sprintf("%v/%v", ip, mask), egress, ingress); err != nil {
|
||||||
HostNetwork: false,
|
return fmt.Errorf("Failed to add pod to shaper: %v", err)
|
||||||
}, BridgeName); err != nil {
|
}
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations, options map[string]string) error {
|
// TODO: replace with CNI port-forwarding plugin
|
||||||
plugin.mu.Lock()
|
func (plugin *kubenetNetworkPlugin) addPortMapping(id kubecontainer.ContainerID, name, namespace string) error {
|
||||||
defer plugin.mu.Unlock()
|
portMappings, err := plugin.host.GetPodPortMappings(id.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(portMappings) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
iplist, exists := plugin.getCachedPodIPs(id)
|
||||||
|
if !exists {
|
||||||
|
return fmt.Errorf("pod %s does not have recorded ips", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ip := range iplist {
|
||||||
|
pm := &hostport.PodPortMapping{
|
||||||
|
Namespace: namespace,
|
||||||
|
Name: name,
|
||||||
|
PortMappings: portMappings,
|
||||||
|
IP: net.ParseIP(ip),
|
||||||
|
HostNetwork: false,
|
||||||
|
}
|
||||||
|
if err := plugin.hostportManager.Add(id.ID, pm, BridgeName); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations, options map[string]string) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
|
||||||
klog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name)
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err := plugin.Status(); err != nil {
|
if err := plugin.Status(); err != nil {
|
||||||
return fmt.Errorf("Kubenet cannot SetUpPod: %v", err)
|
return fmt.Errorf("Kubenet cannot SetUpPod: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
klog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name)
|
||||||
|
}()
|
||||||
|
|
||||||
if err := plugin.setup(namespace, name, id, annotations); err != nil {
|
if err := plugin.setup(namespace, name, id, annotations); err != nil {
|
||||||
// Make sure everything gets cleaned up on errors
|
if err := plugin.teardown(namespace, name, id); err != nil {
|
||||||
podIP, _ := plugin.podIPs[id]
|
|
||||||
if err := plugin.teardown(namespace, name, id, podIP); err != nil {
|
|
||||||
// Not a hard error or warning
|
// Not a hard error or warning
|
||||||
klog.V(4).Infof("Failed to clean up %s/%s after SetUpPod failure: %v", namespace, name, err)
|
klog.V(4).Infof("Failed to clean up %s/%s after SetUpPod failure: %v", namespace, name, err)
|
||||||
}
|
}
|
||||||
@ -415,27 +500,12 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
|
|||||||
|
|
||||||
// Tears down as much of a pod's network as it can even if errors occur. Returns
|
// Tears down as much of a pod's network as it can even if errors occur. Returns
|
||||||
// an aggregate error composed of all errors encountered during the teardown.
|
// an aggregate error composed of all errors encountered during the teardown.
|
||||||
func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id kubecontainer.ContainerID, podIP string) error {
|
func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id kubecontainer.ContainerID) error {
|
||||||
errList := []error{}
|
errList := []error{}
|
||||||
|
|
||||||
if podIP != "" {
|
// no ip dependent actions
|
||||||
klog.V(5).Infof("Removing pod IP %s from shaper", podIP)
|
|
||||||
// shaper wants /32
|
|
||||||
if err := plugin.shaper().Reset(fmt.Sprintf("%s/32", podIP)); err != nil {
|
|
||||||
// Possible bandwidth shaping wasn't enabled for this pod anyways
|
|
||||||
klog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", podIP, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
delete(plugin.podIPs, id)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil {
|
if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil {
|
||||||
// This is to prevent returning error when TearDownPod is called twice on the same pod. This helps to reduce event pollution.
|
errList = append(errList, err)
|
||||||
if podIP != "" {
|
|
||||||
klog.Warningf("Failed to delete container from kubenet: %v", err)
|
|
||||||
} else {
|
|
||||||
errList = append(errList, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
portMappings, err := plugin.host.GetPodPortMappings(id.ID)
|
portMappings, err := plugin.host.GetPodPortMappings(id.ID)
|
||||||
@ -451,13 +521,33 @@ func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id k
|
|||||||
errList = append(errList, err)
|
errList = append(errList, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
iplist, exists := plugin.getCachedPodIPs(id)
|
||||||
|
if !exists || len(iplist) == 0 {
|
||||||
|
klog.V(5).Infof("container %s (%s/%s) does not have recorded. ignoring teardown call", id, name, namespace)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ip := range iplist {
|
||||||
|
klog.V(5).Infof("Removing pod IP %s from shaper for (%s/%s)", ip, name, namespace)
|
||||||
|
// shaper uses a cidr, but we are using a single IP.
|
||||||
|
isV6 := netutils.IsIPv6String(ip)
|
||||||
|
mask := "32"
|
||||||
|
if isV6 {
|
||||||
|
mask = "128"
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := plugin.shaper().Reset(fmt.Sprintf("%s/%s", ip, mask)); err != nil {
|
||||||
|
// Possible bandwidth shaping wasn't enabled for this pod anyways
|
||||||
|
klog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", ip, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
plugin.removePodIP(id, ip)
|
||||||
|
}
|
||||||
return utilerrors.NewAggregate(errList)
|
return utilerrors.NewAggregate(errList)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error {
|
func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error {
|
||||||
plugin.mu.Lock()
|
|
||||||
defer plugin.mu.Unlock()
|
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
klog.V(4).Infof("TearDownPod took %v for %s/%s", time.Since(start), namespace, name)
|
klog.V(4).Infof("TearDownPod took %v for %s/%s", time.Since(start), namespace, name)
|
||||||
@ -467,9 +557,7 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i
|
|||||||
return fmt.Errorf("Kubenet needs a PodCIDR to tear down pods")
|
return fmt.Errorf("Kubenet needs a PodCIDR to tear down pods")
|
||||||
}
|
}
|
||||||
|
|
||||||
// no cached IP is Ok during teardown
|
if err := plugin.teardown(namespace, name, id); err != nil {
|
||||||
podIP, _ := plugin.podIPs[id]
|
|
||||||
if err := plugin.teardown(namespace, name, id, podIP); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -477,20 +565,19 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i
|
|||||||
if err := plugin.ensureMasqRule(); err != nil {
|
if err := plugin.ensureMasqRule(); err != nil {
|
||||||
klog.Errorf("Failed to ensure MASQ rule: %v", err)
|
klog.Errorf("Failed to ensure MASQ rule: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin.
|
// TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin.
|
||||||
// Also fix the runtime's call to Status function to be done only in the case that the IP is lost, no need to do periodic calls
|
// Also fix the runtime's call to Status function to be done only in the case that the IP is lost, no need to do periodic calls
|
||||||
func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name string, id kubecontainer.ContainerID) (*network.PodNetworkStatus, error) {
|
func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name string, id kubecontainer.ContainerID) (*network.PodNetworkStatus, error) {
|
||||||
plugin.mu.Lock()
|
// try cached version
|
||||||
defer plugin.mu.Unlock()
|
networkStatus := plugin.getNetworkStatus(id)
|
||||||
// Assuming the ip of pod does not change. Try to retrieve ip from kubenet map first.
|
if networkStatus != nil {
|
||||||
if podIP, ok := plugin.podIPs[id]; ok {
|
return networkStatus, nil
|
||||||
return &network.PodNetworkStatus{IP: net.ParseIP(podIP)}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// not a cached version, get via network ns
|
||||||
netnsPath, err := plugin.host.GetNetNS(id.ID)
|
netnsPath, err := plugin.host.GetNetNS(id.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err)
|
return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err)
|
||||||
@ -498,13 +585,46 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s
|
|||||||
if netnsPath == "" {
|
if netnsPath == "" {
|
||||||
return nil, fmt.Errorf("Cannot find the network namespace, skipping pod network status for container %q", id)
|
return nil, fmt.Errorf("Cannot find the network namespace, skipping pod network status for container %q", id)
|
||||||
}
|
}
|
||||||
ip, err := network.GetPodIP(plugin.execer, plugin.nsenterPath, netnsPath, network.DefaultInterfaceName)
|
ips, err := network.GetPodIPs(plugin.execer, plugin.nsenterPath, netnsPath, network.DefaultInterfaceName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
plugin.podIPs[id] = ip.String()
|
// cache the ips
|
||||||
return &network.PodNetworkStatus{IP: ip}, nil
|
for _, ip := range ips {
|
||||||
|
plugin.addPodIP(id, ip.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// return from cached
|
||||||
|
return plugin.getNetworkStatus(id), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns networkstatus
|
||||||
|
func (plugin *kubenetNetworkPlugin) getNetworkStatus(id kubecontainer.ContainerID) *network.PodNetworkStatus {
|
||||||
|
// Assuming the ip of pod does not change. Try to retrieve ip from kubenet map first.
|
||||||
|
iplist, ok := plugin.getCachedPodIPs(id)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// sort making v4 first
|
||||||
|
// TODO: (khenidak) IPv6 beta stage.
|
||||||
|
// This - forced sort - could be avoided by checking which cidr that an IP belongs
|
||||||
|
// to, then placing the IP according to cidr index. But before doing that. Check how IP is collected
|
||||||
|
// across all of kubelet code (against cni and cri).
|
||||||
|
ips := make([]net.IP, 0)
|
||||||
|
for _, ip := range iplist {
|
||||||
|
isV6 := netutils.IsIPv6String(ip)
|
||||||
|
if !isV6 {
|
||||||
|
ips = append([]net.IP{net.ParseIP(ip)}, ips...)
|
||||||
|
} else {
|
||||||
|
ips = append(ips, net.ParseIP(ip))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &network.PodNetworkStatus{
|
||||||
|
IP: ips[0],
|
||||||
|
IPs: ips,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (plugin *kubenetNetworkPlugin) Status() error {
|
func (plugin *kubenetNetworkPlugin) Status() error {
|
||||||
@ -571,11 +691,8 @@ func (plugin *kubenetNetworkPlugin) addContainerToNetwork(config *libcni.Network
|
|||||||
}
|
}
|
||||||
|
|
||||||
klog.V(3).Infof("Adding %s/%s to '%s' with CNI '%s' plugin and runtime: %+v", namespace, name, config.Network.Name, config.Network.Type, rt)
|
klog.V(3).Infof("Adding %s/%s to '%s' with CNI '%s' plugin and runtime: %+v", namespace, name, config.Network.Name, config.Network.Type, rt)
|
||||||
// The network plugin can take up to 3 seconds to execute,
|
|
||||||
// so yield the lock while it runs.
|
|
||||||
plugin.mu.Unlock()
|
|
||||||
res, err := plugin.cniConfig.AddNetwork(context.TODO(), config, rt)
|
res, err := plugin.cniConfig.AddNetwork(context.TODO(), config, rt)
|
||||||
plugin.mu.Lock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Error adding container to network: %v", err)
|
return nil, fmt.Errorf("Error adding container to network: %v", err)
|
||||||
}
|
}
|
||||||
@ -600,8 +717,9 @@ func (plugin *kubenetNetworkPlugin) delContainerFromNetwork(config *libcni.Netwo
|
|||||||
|
|
||||||
// shaper retrieves the bandwidth shaper and, if it hasn't been fetched before,
|
// shaper retrieves the bandwidth shaper and, if it hasn't been fetched before,
|
||||||
// initializes it and ensures the bridge is appropriately configured
|
// initializes it and ensures the bridge is appropriately configured
|
||||||
// This function should only be called while holding the `plugin.mu` lock
|
|
||||||
func (plugin *kubenetNetworkPlugin) shaper() bandwidth.Shaper {
|
func (plugin *kubenetNetworkPlugin) shaper() bandwidth.Shaper {
|
||||||
|
plugin.mu.Lock()
|
||||||
|
defer plugin.mu.Unlock()
|
||||||
if plugin.bandwidthShaper == nil {
|
if plugin.bandwidthShaper == nil {
|
||||||
plugin.bandwidthShaper = bandwidth.NewTCShaper(BridgeName)
|
plugin.bandwidthShaper = bandwidth.NewTCShaper(BridgeName)
|
||||||
plugin.bandwidthShaper.ReconcileInterface()
|
plugin.bandwidthShaper.ReconcileInterface()
|
||||||
@ -624,30 +742,43 @@ func (plugin *kubenetNetworkPlugin) syncEbtablesDedupRules(macAddr net.HardwareA
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(3).Infof("Filtering packets with ebtables on mac address: %v, gateway: %v, pod CIDR: %v", macAddr.String(), plugin.gateway.String(), plugin.podCidr)
|
// ensure custom chain exists
|
||||||
_, err = plugin.ebtables.EnsureChain(utilebtables.TableFilter, dedupChain)
|
_, err = plugin.ebtables.EnsureChain(utilebtables.TableFilter, dedupChain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to ensure %v chain %v", utilebtables.TableFilter, dedupChain)
|
klog.Errorf("Failed to ensure %v chain %v", utilebtables.TableFilter, dedupChain)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// jump to custom chain to the chain from core tables
|
||||||
_, err = plugin.ebtables.EnsureRule(utilebtables.Append, utilebtables.TableFilter, utilebtables.ChainOutput, "-j", string(dedupChain))
|
_, err = plugin.ebtables.EnsureRule(utilebtables.Append, utilebtables.TableFilter, utilebtables.ChainOutput, "-j", string(dedupChain))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to ensure %v chain %v jump to %v chain: %v", utilebtables.TableFilter, utilebtables.ChainOutput, dedupChain, err)
|
klog.Errorf("Failed to ensure %v chain %v jump to %v chain: %v", utilebtables.TableFilter, utilebtables.ChainOutput, dedupChain, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
commonArgs := []string{"-p", "IPv4", "-s", macAddr.String(), "-o", "veth+"}
|
// per gateway rule
|
||||||
_, err = plugin.ebtables.EnsureRule(utilebtables.Prepend, utilebtables.TableFilter, dedupChain, append(commonArgs, "--ip-src", plugin.gateway.String(), "-j", "ACCEPT")...)
|
for idx, gw := range plugin.podGateways {
|
||||||
if err != nil {
|
klog.V(3).Infof("Filtering packets with ebtables on mac address: %v, gateway: %v, pod CIDR: %v", macAddr.String(), gw.String(), plugin.podCIDRs[idx].String())
|
||||||
klog.Errorf("Failed to ensure packets from cbr0 gateway to be accepted")
|
|
||||||
return
|
|
||||||
|
|
||||||
}
|
bIsV6 := netutils.IsIPv6(gw)
|
||||||
_, err = plugin.ebtables.EnsureRule(utilebtables.Append, utilebtables.TableFilter, dedupChain, append(commonArgs, "--ip-src", plugin.podCidr, "-j", "DROP")...)
|
IPFamily := "IPv4"
|
||||||
if err != nil {
|
ipSrc := "--ip-src"
|
||||||
klog.Errorf("Failed to ensure packets from podCidr but has mac address of cbr0 to get dropped.")
|
if bIsV6 {
|
||||||
return
|
IPFamily = "IPv6"
|
||||||
|
ipSrc = "--ip6-src"
|
||||||
|
}
|
||||||
|
commonArgs := []string{"-p", IPFamily, "-s", macAddr.String(), "-o", "veth+"}
|
||||||
|
_, err = plugin.ebtables.EnsureRule(utilebtables.Prepend, utilebtables.TableFilter, dedupChain, append(commonArgs, ipSrc, gw.String(), "-j", "ACCEPT")...)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Failed to ensure packets from cbr0 gateway:%v to be accepted with error:%v", gw.String(), err)
|
||||||
|
return
|
||||||
|
|
||||||
|
}
|
||||||
|
_, err = plugin.ebtables.EnsureRule(utilebtables.Append, utilebtables.TableFilter, dedupChain, append(commonArgs, ipSrc, plugin.podCIDRs[idx].String(), "-j", "DROP")...)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Failed to ensure packets from podCidr[%v] but has mac address of cbr0 to get dropped. err:%v", plugin.podCIDRs[idx].String(), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -690,3 +821,71 @@ func (plugin *kubenetNetworkPlugin) disableContainerDAD(id kubecontainer.Contain
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// given a n cidrs assigned to nodes,
|
||||||
|
// create bridge configuration that conforms to them
|
||||||
|
func (plugin *kubenetNetworkPlugin) getRangesConfig() string {
|
||||||
|
createRange := func(thisNet *net.IPNet) string {
|
||||||
|
template := `
|
||||||
|
[{
|
||||||
|
"subnet": "%s",
|
||||||
|
"gateway": "%s"
|
||||||
|
}]`
|
||||||
|
return fmt.Sprintf(template, thisNet.String(), thisNet.IP.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
ranges := make([]string, len(plugin.podCIDRs))
|
||||||
|
for idx, thisCIDR := range plugin.podCIDRs {
|
||||||
|
ranges[idx] = createRange(thisCIDR)
|
||||||
|
}
|
||||||
|
//[{range}], [{range}]
|
||||||
|
// each range is a subnet and a gateway
|
||||||
|
return strings.Join(ranges[:], ",")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (plugin *kubenetNetworkPlugin) addPodIP(id kubecontainer.ContainerID, ip string) {
|
||||||
|
plugin.mu.Lock()
|
||||||
|
defer plugin.mu.Unlock()
|
||||||
|
|
||||||
|
_, exist := plugin.podIPs[id]
|
||||||
|
if !exist {
|
||||||
|
plugin.podIPs[id] = utilsets.NewString()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !plugin.podIPs[id].Has(ip) {
|
||||||
|
plugin.podIPs[id].Insert(ip)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (plugin *kubenetNetworkPlugin) removePodIP(id kubecontainer.ContainerID, ip string) {
|
||||||
|
plugin.mu.Lock()
|
||||||
|
defer plugin.mu.Unlock()
|
||||||
|
|
||||||
|
_, exist := plugin.podIPs[id]
|
||||||
|
if !exist {
|
||||||
|
return // did we restart kubelet?
|
||||||
|
}
|
||||||
|
|
||||||
|
if plugin.podIPs[id].Has(ip) {
|
||||||
|
plugin.podIPs[id].Delete(ip)
|
||||||
|
}
|
||||||
|
|
||||||
|
// if there is no more ips here. let us delete
|
||||||
|
if plugin.podIPs[id].Len() == 0 {
|
||||||
|
delete(plugin.podIPs, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns a copy of pod ips
|
||||||
|
// false is returned if id does not exist
|
||||||
|
func (plugin *kubenetNetworkPlugin) getCachedPodIPs(id kubecontainer.ContainerID) ([]string, bool) {
|
||||||
|
plugin.mu.Lock()
|
||||||
|
defer plugin.mu.Unlock()
|
||||||
|
|
||||||
|
iplist, exists := plugin.podIPs[id]
|
||||||
|
if !exists {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
return iplist.UnsortedList(), true
|
||||||
|
}
|
||||||
|
@ -18,12 +18,13 @@ package kubenet
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
utilsets "k8s.io/apimachinery/pkg/util/sets"
|
||||||
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/dockershim/network"
|
"k8s.io/kubernetes/pkg/kubelet/dockershim/network"
|
||||||
@ -40,7 +41,7 @@ import (
|
|||||||
// test it fulfills the NetworkPlugin interface
|
// test it fulfills the NetworkPlugin interface
|
||||||
var _ network.NetworkPlugin = &kubenetNetworkPlugin{}
|
var _ network.NetworkPlugin = &kubenetNetworkPlugin{}
|
||||||
|
|
||||||
func newFakeKubenetPlugin(initMap map[kubecontainer.ContainerID]string, execer exec.Interface, host network.Host) *kubenetNetworkPlugin {
|
func newFakeKubenetPlugin(initMap map[kubecontainer.ContainerID]utilsets.String, execer exec.Interface, host network.Host) *kubenetNetworkPlugin {
|
||||||
return &kubenetNetworkPlugin{
|
return &kubenetNetworkPlugin{
|
||||||
podIPs: initMap,
|
podIPs: initMap,
|
||||||
execer: execer,
|
execer: execer,
|
||||||
@ -50,31 +51,38 @@ func newFakeKubenetPlugin(initMap map[kubecontainer.ContainerID]string, execer e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestGetPodNetworkStatus(t *testing.T) {
|
func TestGetPodNetworkStatus(t *testing.T) {
|
||||||
podIPMap := make(map[kubecontainer.ContainerID]string)
|
podIPMap := make(map[kubecontainer.ContainerID]utilsets.String)
|
||||||
podIPMap[kubecontainer.ContainerID{ID: "1"}] = "10.245.0.2"
|
podIPMap[kubecontainer.ContainerID{ID: "1"}] = utilsets.NewString("10.245.0.2")
|
||||||
podIPMap[kubecontainer.ContainerID{ID: "2"}] = "10.245.0.3"
|
podIPMap[kubecontainer.ContainerID{ID: "2"}] = utilsets.NewString("10.245.0.3")
|
||||||
|
podIPMap[kubecontainer.ContainerID{ID: "3"}] = utilsets.NewString("10.245.0.4", "2000::")
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
id string
|
id string
|
||||||
expectError bool
|
expectError bool
|
||||||
expectIP string
|
expectIP utilsets.String
|
||||||
}{
|
}{
|
||||||
//in podCIDR map
|
//in podCIDR map
|
||||||
{
|
{
|
||||||
"1",
|
id: "1",
|
||||||
false,
|
expectError: false,
|
||||||
"10.245.0.2",
|
expectIP: utilsets.NewString("10.245.0.2"),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"2",
|
id: "2",
|
||||||
false,
|
expectError: false,
|
||||||
"10.245.0.3",
|
expectIP: utilsets.NewString("10.245.0.3"),
|
||||||
},
|
},
|
||||||
//not in podCIDR map
|
|
||||||
{
|
{
|
||||||
"3",
|
id: "3",
|
||||||
true,
|
expectError: false,
|
||||||
"",
|
expectIP: utilsets.NewString("10.245.0.4", "2000::"),
|
||||||
|
},
|
||||||
|
|
||||||
|
//not in podIP map
|
||||||
|
{
|
||||||
|
id: "does-not-exist-map",
|
||||||
|
expectError: true,
|
||||||
|
expectIP: nil,
|
||||||
},
|
},
|
||||||
//TODO: add test cases for retrieving ip inside container network namespace
|
//TODO: add test cases for retrieving ip inside container network namespace
|
||||||
}
|
}
|
||||||
@ -85,11 +93,12 @@ func TestGetPodNetworkStatus(t *testing.T) {
|
|||||||
fCmd := fakeexec.FakeCmd{
|
fCmd := fakeexec.FakeCmd{
|
||||||
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
|
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
|
||||||
func() ([]byte, error) {
|
func() ([]byte, error) {
|
||||||
ip, ok := podIPMap[kubecontainer.ContainerID{ID: t.id}]
|
ips, ok := podIPMap[kubecontainer.ContainerID{ID: t.id}]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("Pod IP %q not found", t.id)
|
return nil, fmt.Errorf("Pod IP %q not found", t.id)
|
||||||
}
|
}
|
||||||
return []byte(ip), nil
|
ipsList := ips.UnsortedList()
|
||||||
|
return []byte(ipsList[0]), nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -119,9 +128,20 @@ func TestGetPodNetworkStatus(t *testing.T) {
|
|||||||
t.Errorf("Test case %d expects error but got error: %v", i, err)
|
t.Errorf("Test case %d expects error but got error: %v", i, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if tc.expectIP != out.IP.String() {
|
seen := make(map[string]bool)
|
||||||
|
allExpected := tc.expectIP.UnsortedList()
|
||||||
|
for _, expectedIP := range allExpected {
|
||||||
|
for _, outIP := range out.IPs {
|
||||||
|
if expectedIP == outIP.String() {
|
||||||
|
seen[expectedIP] = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(tc.expectIP) != len(seen) {
|
||||||
t.Errorf("Test case %d expects ip %s but got %s", i, tc.expectIP, out.IP.String())
|
t.Errorf("Test case %d expects ip %s but got %s", i, tc.expectIP, out.IP.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -137,7 +157,8 @@ func TestTeardownCallsShaper(t *testing.T) {
|
|||||||
fhost := nettest.NewFakeHost(nil)
|
fhost := nettest.NewFakeHost(nil)
|
||||||
fshaper := &bandwidth.FakeShaper{}
|
fshaper := &bandwidth.FakeShaper{}
|
||||||
mockcni := &mock_cni.MockCNI{}
|
mockcni := &mock_cni.MockCNI{}
|
||||||
kubenet := newFakeKubenetPlugin(map[kubecontainer.ContainerID]string{}, fexec, fhost)
|
ips := make(map[kubecontainer.ContainerID]utilsets.String)
|
||||||
|
kubenet := newFakeKubenetPlugin(ips, fexec, fhost)
|
||||||
kubenet.cniConfig = mockcni
|
kubenet.cniConfig = mockcni
|
||||||
kubenet.iptables = ipttest.NewFake()
|
kubenet.iptables = ipttest.NewFake()
|
||||||
kubenet.bandwidthShaper = fshaper
|
kubenet.bandwidthShaper = fshaper
|
||||||
@ -150,7 +171,7 @@ func TestTeardownCallsShaper(t *testing.T) {
|
|||||||
kubenet.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details)
|
kubenet.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details)
|
||||||
|
|
||||||
existingContainerID := kubecontainer.BuildContainerID("docker", "123")
|
existingContainerID := kubecontainer.BuildContainerID("docker", "123")
|
||||||
kubenet.podIPs[existingContainerID] = "10.0.0.1"
|
kubenet.podIPs[existingContainerID] = utilsets.NewString("10.0.0.1")
|
||||||
|
|
||||||
if err := kubenet.TearDownPod("namespace", "name", existingContainerID); err != nil {
|
if err := kubenet.TearDownPod("namespace", "name", existingContainerID); err != nil {
|
||||||
t.Fatalf("Unexpected error in TearDownPod: %v", err)
|
t.Fatalf("Unexpected error in TearDownPod: %v", err)
|
||||||
@ -185,7 +206,8 @@ func TestInit_MTU(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fhost := nettest.NewFakeHost(nil)
|
fhost := nettest.NewFakeHost(nil)
|
||||||
kubenet := newFakeKubenetPlugin(map[kubecontainer.ContainerID]string{}, fexec, fhost)
|
ips := make(map[kubecontainer.ContainerID]utilsets.String)
|
||||||
|
kubenet := newFakeKubenetPlugin(ips, fexec, fhost)
|
||||||
kubenet.iptables = ipttest.NewFake()
|
kubenet.iptables = ipttest.NewFake()
|
||||||
|
|
||||||
sysctl := sysctltest.NewFake()
|
sysctl := sysctltest.NewFake()
|
||||||
@ -203,22 +225,23 @@ func TestInit_MTU(t *testing.T) {
|
|||||||
// This is how kubenet is invoked from the cri.
|
// This is how kubenet is invoked from the cri.
|
||||||
func TestTearDownWithoutRuntime(t *testing.T) {
|
func TestTearDownWithoutRuntime(t *testing.T) {
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
podCIDR string
|
podCIDR []string
|
||||||
ip string
|
ip string
|
||||||
expectedGateway string
|
expectedGateway []string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
podCIDR: "10.0.0.1/24",
|
podCIDR: []string{"10.0.0.1/24"},
|
||||||
ip: "10.0.0.1",
|
ip: "10.0.0.1",
|
||||||
expectedGateway: "10.0.0.1",
|
expectedGateway: []string{"10.0.0.1"},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
podCIDR: "2001:beef::1/48",
|
podCIDR: []string{"2001:beef::1/48"},
|
||||||
ip: "2001:beef::1",
|
ip: "2001:beef::1",
|
||||||
expectedGateway: "2001:beef::1",
|
expectedGateway: []string{"2001:beef::1"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
|
|
||||||
fhost := nettest.NewFakeHost(nil)
|
fhost := nettest.NewFakeHost(nil)
|
||||||
fhost.Legacy = false
|
fhost.Legacy = false
|
||||||
mockcni := &mock_cni.MockCNI{}
|
mockcni := &mock_cni.MockCNI{}
|
||||||
@ -230,22 +253,39 @@ func TestTearDownWithoutRuntime(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
kubenet := newFakeKubenetPlugin(map[kubecontainer.ContainerID]string{}, fexec, fhost)
|
ips := make(map[kubecontainer.ContainerID]utilsets.String)
|
||||||
|
kubenet := newFakeKubenetPlugin(ips, fexec, fhost)
|
||||||
kubenet.cniConfig = mockcni
|
kubenet.cniConfig = mockcni
|
||||||
kubenet.iptables = ipttest.NewFake()
|
kubenet.iptables = ipttest.NewFake()
|
||||||
|
|
||||||
details := make(map[string]interface{})
|
details := make(map[string]interface{})
|
||||||
details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = tc.podCIDR
|
details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = strings.Join(tc.podCIDR, ",")
|
||||||
kubenet.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details)
|
kubenet.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details)
|
||||||
|
|
||||||
if kubenet.gateway.String() != tc.expectedGateway {
|
if len(kubenet.podGateways) != len(tc.expectedGateway) {
|
||||||
t.Errorf("generated gateway: %q, expecting: %q", kubenet.gateway.String(), tc.expectedGateway)
|
t.Errorf("generated gateway: %q, expecting: %q are not of the same length", kubenet.podGateways, tc.expectedGateway)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
if kubenet.podCidr != tc.podCIDR {
|
|
||||||
t.Errorf("generated podCidr: %q, expecting: %q", kubenet.podCidr, tc.podCIDR)
|
for idx := range tc.expectedGateway {
|
||||||
|
if kubenet.podGateways[idx].String() != tc.expectedGateway[idx] {
|
||||||
|
t.Errorf("generated gateway: %q, expecting: %q", kubenet.podGateways[idx].String(), tc.expectedGateway[idx])
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(kubenet.podCIDRs) != len(tc.podCIDR) {
|
||||||
|
t.Errorf("generated podCidr: %q, expecting: %q are not of the same length", kubenet.podCIDRs, tc.podCIDR)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for idx := range tc.podCIDR {
|
||||||
|
if kubenet.podCIDRs[idx].String() != tc.podCIDR[idx] {
|
||||||
|
t.Errorf("generated podCidr: %q, expecting: %q", kubenet.podCIDRs[idx].String(), tc.podCIDR[idx])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
existingContainerID := kubecontainer.BuildContainerID("docker", "123")
|
existingContainerID := kubecontainer.BuildContainerID("docker", "123")
|
||||||
kubenet.podIPs[existingContainerID] = tc.ip
|
kubenet.podIPs[existingContainerID] = utilsets.NewString(tc.ip)
|
||||||
|
|
||||||
mockcni.On("DelNetwork", mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil)
|
mockcni.On("DelNetwork", mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil)
|
||||||
|
|
||||||
|
@ -34,6 +34,9 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/dockershim/network/metrics"
|
"k8s.io/kubernetes/pkg/kubelet/dockershim/network/metrics"
|
||||||
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
||||||
utilexec "k8s.io/utils/exec"
|
utilexec "k8s.io/utils/exec"
|
||||||
|
|
||||||
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
|
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -89,6 +92,8 @@ type PodNetworkStatus struct {
|
|||||||
// - service endpoints are constructed with
|
// - service endpoints are constructed with
|
||||||
// - will be reported in the PodStatus.PodIP field (will override the IP reported by docker)
|
// - will be reported in the PodStatus.PodIP field (will override the IP reported by docker)
|
||||||
IP net.IP `json:"ip" description:"Primary IP address of the pod"`
|
IP net.IP `json:"ip" description:"Primary IP address of the pod"`
|
||||||
|
// IPs is the list of IPs assigned to Pod. IPs[0] == IP. The rest of the list is additional IPs
|
||||||
|
IPs []net.IP `json:"ips" description:"list of additional ips (inclusive of IP) assigned to pod"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Host is an interface that plugins can use to access the kubelet.
|
// Host is an interface that plugins can use to access the kubelet.
|
||||||
@ -250,17 +255,37 @@ func getOnePodIP(execer utilexec.Interface, nsenterPath, netnsPath, interfaceNam
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetPodIP gets the IP of the pod by inspecting the network info inside the pod's network namespace.
|
// GetPodIP gets the IP of the pod by inspecting the network info inside the pod's network namespace.
|
||||||
func GetPodIP(execer utilexec.Interface, nsenterPath, netnsPath, interfaceName string) (net.IP, error) {
|
// TODO (khenidak). The "primary ip" in dual stack world does not really exist. For now
|
||||||
ip, err := getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-4")
|
// we are defaulting to v4 as primary
|
||||||
if err != nil {
|
func GetPodIPs(execer utilexec.Interface, nsenterPath, netnsPath, interfaceName string) ([]net.IP, error) {
|
||||||
// Fall back to IPv6 address if no IPv4 address is present
|
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.IPv6DualStack) {
|
||||||
ip, err = getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-6")
|
ip, err := getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-4")
|
||||||
}
|
if err != nil {
|
||||||
if err != nil {
|
// Fall back to IPv6 address if no IPv4 address is present
|
||||||
return nil, err
|
ip, err = getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-6")
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return []net.IP{ip}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return ip, nil
|
list := make([]net.IP, 0)
|
||||||
|
var err4, err6 error
|
||||||
|
if ipv4, err4 := getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-4"); err4 != nil {
|
||||||
|
list = append(list, ipv4)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ipv6, err6 := getOnePodIP(execer, nsenterPath, netnsPath, interfaceName, "-6"); err6 != nil {
|
||||||
|
list = append(list, ipv6)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(list) == 0 {
|
||||||
|
return nil, utilerrors.NewAggregate([]error{err4, err6})
|
||||||
|
}
|
||||||
|
return list, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type NoopPortMappingGetter struct{}
|
type NoopPortMappingGetter struct{}
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -1520,8 +1521,10 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
|||||||
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
|
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
|
||||||
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
|
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
|
||||||
// set pod IP to hostIP directly in runtime.GetPodStatus
|
// set pod IP to hostIP directly in runtime.GetPodStatus
|
||||||
podStatus.IP = apiPodStatus.PodIP
|
podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
|
||||||
|
for _, ipInfo := range apiPodStatus.PodIPs {
|
||||||
|
podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
|
||||||
|
}
|
||||||
// Record the time it takes for the pod to become running.
|
// Record the time it takes for the pod to become running.
|
||||||
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
|
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
|
||||||
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
|
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
|
||||||
@ -2250,9 +2253,10 @@ func (kl *Kubelet) fastStatusUpdateOnce() {
|
|||||||
klog.Errorf(err.Error())
|
klog.Errorf(err.Error())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if node.Spec.PodCIDR != "" {
|
if len(node.Spec.PodCIDRs) != 0 {
|
||||||
if _, err := kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
|
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
|
||||||
klog.Errorf("Pod CIDR update failed %v", err)
|
if _, err := kl.updatePodCIDR(podCIDRs); err != nil {
|
||||||
|
klog.Errorf("Pod CIDR update to %v failed %v", podCIDRs, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
kl.updateRuntimeUp()
|
kl.updateRuntimeUp()
|
||||||
|
@ -22,11 +22,10 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
goruntime "runtime"
|
goruntime "runtime"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/klog"
|
"k8s.io/api/core/v1"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
@ -34,6 +33,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
cloudprovider "k8s.io/cloud-provider"
|
cloudprovider "k8s.io/cloud-provider"
|
||||||
|
"k8s.io/klog"
|
||||||
k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
|
k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
|
||||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
@ -417,11 +417,12 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
podCIDRChanged := false
|
podCIDRChanged := false
|
||||||
if node.Spec.PodCIDR != "" {
|
if len(node.Spec.PodCIDRs) != 0 {
|
||||||
// Pod CIDR could have been updated before, so we cannot rely on
|
// Pod CIDR could have been updated before, so we cannot rely on
|
||||||
// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
|
// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
|
||||||
// actually changed.
|
// actually changed.
|
||||||
if podCIDRChanged, err = kl.updatePodCIDR(node.Spec.PodCIDR); err != nil {
|
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
|
||||||
|
if podCIDRChanged, err = kl.updatePodCIDR(podCIDRs); err != nil {
|
||||||
klog.Errorf(err.Error())
|
klog.Errorf(err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
goruntime "runtime"
|
goruntime "runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -1004,11 +1005,12 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
|
|||||||
// Report node status if it is still within the duration of nodeStatusReportFrequency.
|
// Report node status if it is still within the duration of nodeStatusReportFrequency.
|
||||||
clock.Step(10 * time.Second)
|
clock.Step(10 * time.Second)
|
||||||
assert.Equal(t, "", kubelet.runtimeState.podCIDR(), "Pod CIDR should be empty")
|
assert.Equal(t, "", kubelet.runtimeState.podCIDR(), "Pod CIDR should be empty")
|
||||||
podCIDR := "10.0.0.0/24"
|
podCIDRs := []string{"10.0.0.0/24", "2000::/10"}
|
||||||
updatedNode.Spec.PodCIDR = podCIDR
|
updatedNode.Spec.PodCIDR = podCIDRs[0]
|
||||||
|
updatedNode.Spec.PodCIDRs = podCIDRs
|
||||||
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*updatedNode}}).ReactionChain
|
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*updatedNode}}).ReactionChain
|
||||||
assert.NoError(t, kubelet.updateNodeStatus())
|
assert.NoError(t, kubelet.updateNodeStatus())
|
||||||
assert.Equal(t, podCIDR, kubelet.runtimeState.podCIDR(), "Pod CIDR should be updated now")
|
assert.Equal(t, strings.Join(podCIDRs, ","), kubelet.runtimeState.podCIDR(), "Pod CIDR should be updated now")
|
||||||
// 2 more action (There were 7 actions before).
|
// 2 more action (There were 7 actions before).
|
||||||
actions = kubeClient.Actions()
|
actions = kubeClient.Actions()
|
||||||
assert.Len(t, actions, 9)
|
assert.Len(t, actions, 9)
|
||||||
@ -1019,7 +1021,8 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
|
|||||||
// Update node status when keeping the pod CIDR.
|
// Update node status when keeping the pod CIDR.
|
||||||
// Do not report node status if it is within the duration of nodeStatusReportFrequency.
|
// Do not report node status if it is within the duration of nodeStatusReportFrequency.
|
||||||
clock.Step(10 * time.Second)
|
clock.Step(10 * time.Second)
|
||||||
assert.Equal(t, podCIDR, kubelet.runtimeState.podCIDR(), "Pod CIDR should already be updated")
|
assert.Equal(t, strings.Join(podCIDRs, ","), kubelet.runtimeState.podCIDR(), "Pod CIDR should already be updated")
|
||||||
|
|
||||||
assert.NoError(t, kubelet.updateNodeStatus())
|
assert.NoError(t, kubelet.updateNodeStatus())
|
||||||
// Only 1 more action (There were 9 actions before).
|
// Only 1 more action (There were 9 actions before).
|
||||||
actions = kubeClient.Actions()
|
actions = kubeClient.Actions()
|
||||||
|
@ -1382,7 +1382,16 @@ func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.Po
|
|||||||
// alter the kubelet state at all.
|
// alter the kubelet state at all.
|
||||||
func (kl *Kubelet) convertStatusToAPIStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *v1.PodStatus {
|
func (kl *Kubelet) convertStatusToAPIStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) *v1.PodStatus {
|
||||||
var apiPodStatus v1.PodStatus
|
var apiPodStatus v1.PodStatus
|
||||||
apiPodStatus.PodIP = podStatus.IP
|
apiPodStatus.PodIPs = make([]v1.PodIP, 0, len(podStatus.IPs))
|
||||||
|
for _, ip := range podStatus.IPs {
|
||||||
|
apiPodStatus.PodIPs = append(apiPodStatus.PodIPs, v1.PodIP{
|
||||||
|
IP: ip,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(apiPodStatus.PodIPs) > 0 {
|
||||||
|
apiPodStatus.PodIP = apiPodStatus.PodIPs[0].IP
|
||||||
|
}
|
||||||
// set status for Pods created on versions of kube older than 1.6
|
// set status for Pods created on versions of kube older than 1.6
|
||||||
apiPodStatus.QOSClass = v1qos.GetPodQOS(pod)
|
apiPodStatus.QOSClass = v1qos.GetPodQOS(pod)
|
||||||
|
|
||||||
|
@ -663,18 +663,18 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontaine
|
|||||||
// by container garbage collector.
|
// by container garbage collector.
|
||||||
m.pruneInitContainersBeforeStart(pod, podStatus)
|
m.pruneInitContainersBeforeStart(pod, podStatus)
|
||||||
|
|
||||||
// We pass the value of the podIP down to generatePodSandboxConfig and
|
// We pass the value of the PRIMARY podIP down to generatePodSandboxConfig and
|
||||||
// generateContainerConfig, which in turn passes it to various other
|
// generateContainerConfig, which in turn passes it to various other
|
||||||
// functions, in order to facilitate functionality that requires this
|
// functions, in order to facilitate functionality that requires this
|
||||||
// value (hosts file and downward API) and avoid races determining
|
// value (hosts file and downward API) and avoid races determining
|
||||||
// the pod IP in cases where a container requires restart but the
|
// the pod IP in cases where a container requires restart but the
|
||||||
// podIP isn't in the status manager yet.
|
// podIP isn't in the status manager yet.
|
||||||
//
|
//
|
||||||
// We default to the IP in the passed-in pod status, and overwrite it if the
|
// We default to the IPs in the passed-in pod status, and overwrite them if the
|
||||||
// sandbox needs to be (re)started.
|
// sandbox needs to be (re)started.
|
||||||
podIP := ""
|
var podIPs []string
|
||||||
if podStatus != nil {
|
if podStatus != nil {
|
||||||
podIP = podStatus.IP
|
podIPs = podStatus.IPs
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 4: Create a sandbox for the pod if necessary.
|
// Step 4: Create a sandbox for the pod if necessary.
|
||||||
@ -714,12 +714,20 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontaine
|
|||||||
// If we ever allow updating a pod from non-host-network to
|
// If we ever allow updating a pod from non-host-network to
|
||||||
// host-network, we may use a stale IP.
|
// host-network, we may use a stale IP.
|
||||||
if !kubecontainer.IsHostNetworkPod(pod) {
|
if !kubecontainer.IsHostNetworkPod(pod) {
|
||||||
// Overwrite the podIP passed in the pod status, since we just started the pod sandbox.
|
// Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.
|
||||||
podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
|
podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, podSandboxStatus)
|
||||||
klog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
|
klog.V(4).Infof("Determined the ip %v for pod %q after sandbox changed", podIPs, format.Pod(pod))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the start containers routines depend on pod ip(as in primary pod ip)
|
||||||
|
// instead of trying to figure out if we have 0 < len(podIPs)
|
||||||
|
// everytime, we short circuit it here
|
||||||
|
podIP := ""
|
||||||
|
if len(podIPs) != 0 {
|
||||||
|
podIP = podIPs[0]
|
||||||
|
}
|
||||||
|
|
||||||
// Get podSandboxConfig for containers to start.
|
// Get podSandboxConfig for containers to start.
|
||||||
configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
|
configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
|
||||||
result.AddSyncResult(configPodSandboxResult)
|
result.AddSyncResult(configPodSandboxResult)
|
||||||
@ -880,7 +888,7 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp
|
|||||||
klog.V(4).Infof("getSandboxIDByPodUID got sandbox IDs %q for pod %q", podSandboxIDs, podFullName)
|
klog.V(4).Infof("getSandboxIDByPodUID got sandbox IDs %q for pod %q", podSandboxIDs, podFullName)
|
||||||
|
|
||||||
sandboxStatuses := make([]*runtimeapi.PodSandboxStatus, len(podSandboxIDs))
|
sandboxStatuses := make([]*runtimeapi.PodSandboxStatus, len(podSandboxIDs))
|
||||||
podIP := ""
|
podIPs := []string{}
|
||||||
for idx, podSandboxID := range podSandboxIDs {
|
for idx, podSandboxID := range podSandboxIDs {
|
||||||
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
|
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -891,7 +899,7 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp
|
|||||||
|
|
||||||
// Only get pod IP from latest sandbox
|
// Only get pod IP from latest sandbox
|
||||||
if idx == 0 && podSandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY {
|
if idx == 0 && podSandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY {
|
||||||
podIP = m.determinePodSandboxIP(namespace, name, podSandboxStatus)
|
podIPs = m.determinePodSandboxIPs(namespace, name, podSandboxStatus)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -909,7 +917,7 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp
|
|||||||
ID: uid,
|
ID: uid,
|
||||||
Name: name,
|
Name: name,
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
IP: podIP,
|
IPs: podIPs,
|
||||||
SandboxStatuses: sandboxStatuses,
|
SandboxStatuses: sandboxStatuses,
|
||||||
ContainerStatuses: containerStatuses,
|
ContainerStatuses: containerStatuses,
|
||||||
}, nil
|
}, nil
|
||||||
|
@ -114,18 +114,29 @@ func makeFakePodSandbox(t *testing.T, m *kubeGenericRuntimeManager, template san
|
|||||||
assert.NoError(t, err, "generatePodSandboxConfig for sandbox template %+v", template)
|
assert.NoError(t, err, "generatePodSandboxConfig for sandbox template %+v", template)
|
||||||
|
|
||||||
podSandboxID := apitest.BuildSandboxName(config.Metadata)
|
podSandboxID := apitest.BuildSandboxName(config.Metadata)
|
||||||
return &apitest.FakePodSandbox{
|
podSandBoxStatus := &apitest.FakePodSandbox{
|
||||||
PodSandboxStatus: runtimeapi.PodSandboxStatus{
|
PodSandboxStatus: runtimeapi.PodSandboxStatus{
|
||||||
Id: podSandboxID,
|
Id: podSandboxID,
|
||||||
Metadata: config.Metadata,
|
Metadata: config.Metadata,
|
||||||
State: template.state,
|
State: template.state,
|
||||||
CreatedAt: template.createdAt,
|
CreatedAt: template.createdAt,
|
||||||
Network: &runtimeapi.PodSandboxNetworkStatus{
|
Network: &runtimeapi.PodSandboxNetworkStatus{
|
||||||
Ip: apitest.FakePodSandboxIP,
|
Ip: apitest.FakePodSandboxIPs[0],
|
||||||
},
|
},
|
||||||
Labels: config.Labels,
|
Labels: config.Labels,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
// assign additional IPs
|
||||||
|
additionalIPs := apitest.FakePodSandboxIPs[1:]
|
||||||
|
additionalPodIPs := make([]*runtimeapi.PodIP, 0, len(additionalIPs))
|
||||||
|
for _, ip := range additionalIPs {
|
||||||
|
additionalPodIPs = append(additionalPodIPs, &runtimeapi.PodIP{
|
||||||
|
Ip: ip,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
podSandBoxStatus.Network.AdditionalIps = additionalPodIPs
|
||||||
|
return podSandBoxStatus
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeFakePodSandboxes creates a group of fake pod sandboxes based on the sandbox templates.
|
// makeFakePodSandboxes creates a group of fake pod sandboxes based on the sandbox templates.
|
||||||
@ -311,7 +322,7 @@ func TestGetPodStatus(t *testing.T) {
|
|||||||
assert.Equal(t, pod.UID, podStatus.ID)
|
assert.Equal(t, pod.UID, podStatus.ID)
|
||||||
assert.Equal(t, pod.Name, podStatus.Name)
|
assert.Equal(t, pod.Name, podStatus.Name)
|
||||||
assert.Equal(t, pod.Namespace, podStatus.Namespace)
|
assert.Equal(t, pod.Namespace, podStatus.Namespace)
|
||||||
assert.Equal(t, apitest.FakePodSandboxIP, podStatus.IP)
|
assert.Equal(t, apitest.FakePodSandboxIPs, podStatus.IPs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetPods(t *testing.T) {
|
func TestGetPods(t *testing.T) {
|
||||||
|
@ -214,20 +214,36 @@ func (m *kubeGenericRuntimeManager) getKubeletSandboxes(all bool) ([]*runtimeapi
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// determinePodSandboxIP determines the IP address of the given pod sandbox.
|
// determinePodSandboxIP determines the IP addresses of the given pod sandbox.
|
||||||
func (m *kubeGenericRuntimeManager) determinePodSandboxIP(podNamespace, podName string, podSandbox *runtimeapi.PodSandboxStatus) string {
|
func (m *kubeGenericRuntimeManager) determinePodSandboxIPs(podNamespace, podName string, podSandbox *runtimeapi.PodSandboxStatus) []string {
|
||||||
|
podIPs := make([]string, 0)
|
||||||
if podSandbox.Network == nil {
|
if podSandbox.Network == nil {
|
||||||
klog.Warningf("Pod Sandbox status doesn't have network information, cannot report IP")
|
klog.Warningf("Pod Sandbox status doesn't have network information, cannot report IPs")
|
||||||
return ""
|
return podIPs
|
||||||
}
|
}
|
||||||
ip := podSandbox.Network.Ip
|
|
||||||
if len(ip) != 0 && net.ParseIP(ip) == nil {
|
// ip could be an empty string if runtime is not responsible for the
|
||||||
// ip could be an empty string if runtime is not responsible for the
|
// IP (e.g., host networking).
|
||||||
// IP (e.g., host networking).
|
|
||||||
klog.Warningf("Pod Sandbox reported an unparseable IP %v", ip)
|
// pick primary IP
|
||||||
return ""
|
if len(podSandbox.Network.Ip) != 0 {
|
||||||
|
if net.ParseIP(podSandbox.Network.Ip) == nil {
|
||||||
|
klog.Warningf("Pod Sandbox reported an unparseable IP (Primary) %v", podSandbox.Network.Ip)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
podIPs = append(podIPs, podSandbox.Network.Ip)
|
||||||
}
|
}
|
||||||
return ip
|
|
||||||
|
// pick additional ips, if cri reported them
|
||||||
|
for _, podIP := range podSandbox.Network.AdditionalIps {
|
||||||
|
if nil == net.ParseIP(podIP.Ip) {
|
||||||
|
klog.Warningf("Pod Sandbox reported an unparseable IP (additional) %v", podIP.Ip)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
podIPs = append(podIPs, podIP.Ip)
|
||||||
|
}
|
||||||
|
|
||||||
|
return podIPs
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPodSandboxID gets the sandbox id by podUID and returns ([]sandboxID, error).
|
// getPodSandboxID gets the sandbox id by podUID and returns ([]sandboxID, error).
|
||||||
|
@ -108,10 +108,10 @@ func (hr *HandlerRunner) runHTTPHandler(pod *v1.Pod, container *v1.Container, ha
|
|||||||
klog.Errorf("Unable to get pod info, event handlers may be invalid.")
|
klog.Errorf("Unable to get pod info, event handlers may be invalid.")
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
if status.IP == "" {
|
if len(status.IPs) == 0 {
|
||||||
return "", fmt.Errorf("failed to find networking container: %v", status)
|
return "", fmt.Errorf("failed to find networking container: %v", status)
|
||||||
}
|
}
|
||||||
host = status.IP
|
host = status.IPs[0]
|
||||||
}
|
}
|
||||||
var port int
|
var port int
|
||||||
if handler.HTTPGet.Port.Type == intstr.String && len(handler.HTTPGet.Port.StrVal) == 0 {
|
if handler.HTTPGet.Port.Type == intstr.String && len(handler.HTTPGet.Port.StrVal) == 0 {
|
||||||
|
@ -344,22 +344,22 @@ func (g *GenericPLEG) cacheEnabled() bool {
|
|||||||
return g.cache != nil
|
return g.cache != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPodIP preserves an older cached status' pod IP if the new status has no pod IP
|
// getPodIP preserves an older cached status' pod IP if the new status has no pod IPs
|
||||||
// and its sandboxes have exited
|
// and its sandboxes have exited
|
||||||
func (g *GenericPLEG) getPodIP(pid types.UID, status *kubecontainer.PodStatus) string {
|
func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus) []string {
|
||||||
if status.IP != "" {
|
if len(status.IPs) != 0 {
|
||||||
return status.IP
|
return status.IPs
|
||||||
}
|
}
|
||||||
|
|
||||||
oldStatus, err := g.cache.Get(pid)
|
oldStatus, err := g.cache.Get(pid)
|
||||||
if err != nil || oldStatus.IP == "" {
|
if err != nil || len(oldStatus.IPs) == 0 {
|
||||||
return ""
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, sandboxStatus := range status.SandboxStatuses {
|
for _, sandboxStatus := range status.SandboxStatuses {
|
||||||
// If at least one sandbox is ready, then use this status update's pod IP
|
// If at least one sandbox is ready, then use this status update's pod IP
|
||||||
if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY {
|
if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY {
|
||||||
return status.IP
|
return status.IPs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -369,14 +369,14 @@ func (g *GenericPLEG) getPodIP(pid types.UID, status *kubecontainer.PodStatus) s
|
|||||||
// running then use the new pod IP
|
// running then use the new pod IP
|
||||||
for _, containerStatus := range status.ContainerStatuses {
|
for _, containerStatus := range status.ContainerStatuses {
|
||||||
if containerStatus.State == kubecontainer.ContainerStateCreated || containerStatus.State == kubecontainer.ContainerStateRunning {
|
if containerStatus.State == kubecontainer.ContainerStateCreated || containerStatus.State == kubecontainer.ContainerStateRunning {
|
||||||
return status.IP
|
return status.IPs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// For pods with no ready containers or sandboxes (like exited pods)
|
// For pods with no ready containers or sandboxes (like exited pods)
|
||||||
// use the old status' pod IP
|
// use the old status' pod IP
|
||||||
return oldStatus.IP
|
return oldStatus.IPs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
|
func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
|
||||||
@ -398,7 +398,7 @@ func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error {
|
|||||||
// When a pod is torn down, kubelet may race with PLEG and retrieve
|
// When a pod is torn down, kubelet may race with PLEG and retrieve
|
||||||
// a pod status after network teardown, but the kubernetes API expects
|
// a pod status after network teardown, but the kubernetes API expects
|
||||||
// the completed pod's IP to be available after the pod is dead.
|
// the completed pod's IP to be available after the pod is dead.
|
||||||
status.IP = g.getPodIP(pid, status)
|
status.IPs = g.getPodIPs(pid, status)
|
||||||
}
|
}
|
||||||
|
|
||||||
g.cache.Set(pod.ID, status, err, timestamp)
|
g.cache.Set(pod.ID, status, err, timestamp)
|
||||||
|
@ -573,56 +573,73 @@ func TestRelistingWithSandboxes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestRelistIPChange(t *testing.T) {
|
func TestRelistIPChange(t *testing.T) {
|
||||||
pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock()
|
testCases := []struct {
|
||||||
ch := pleg.Watch()
|
name string
|
||||||
|
podID string
|
||||||
id := types.UID("test-pod-0")
|
podIPs []string
|
||||||
cState := kubecontainer.ContainerStateRunning
|
}{
|
||||||
container := createTestContainer("c0", cState)
|
{
|
||||||
pod := &kubecontainer.Pod{
|
name: "test-0",
|
||||||
ID: id,
|
podID: "test-pod-0",
|
||||||
Containers: []*kubecontainer.Container{container},
|
podIPs: []string{"192.168.1.5"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "tets-1",
|
||||||
|
podID: "test-pod-1",
|
||||||
|
podIPs: []string{"192.168.1.5/24", "2000::"},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
ipAddr := "192.168.1.5/24"
|
for _, tc := range testCases {
|
||||||
status := &kubecontainer.PodStatus{
|
pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock()
|
||||||
ID: id,
|
ch := pleg.Watch()
|
||||||
IP: ipAddr,
|
|
||||||
ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: cState}},
|
id := types.UID(tc.podID)
|
||||||
|
cState := kubecontainer.ContainerStateRunning
|
||||||
|
container := createTestContainer("c0", cState)
|
||||||
|
pod := &kubecontainer.Pod{
|
||||||
|
ID: id,
|
||||||
|
Containers: []*kubecontainer.Container{container},
|
||||||
|
}
|
||||||
|
status := &kubecontainer.PodStatus{
|
||||||
|
ID: id,
|
||||||
|
IPs: tc.podIPs,
|
||||||
|
ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: cState}},
|
||||||
|
}
|
||||||
|
event := &PodLifecycleEvent{ID: pod.ID, Type: ContainerStarted, Data: container.ID.ID}
|
||||||
|
|
||||||
|
runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once()
|
||||||
|
runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once()
|
||||||
|
|
||||||
|
pleg.relist()
|
||||||
|
actualEvents := getEventsFromChannel(ch)
|
||||||
|
actualStatus, actualErr := pleg.cache.Get(pod.ID)
|
||||||
|
assert.Equal(t, status, actualStatus, tc.name)
|
||||||
|
assert.Nil(t, actualErr, tc.name)
|
||||||
|
assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents)
|
||||||
|
|
||||||
|
// Clear the IP address and mark the container terminated
|
||||||
|
container = createTestContainer("c0", kubecontainer.ContainerStateExited)
|
||||||
|
pod = &kubecontainer.Pod{
|
||||||
|
ID: id,
|
||||||
|
Containers: []*kubecontainer.Container{container},
|
||||||
|
}
|
||||||
|
status = &kubecontainer.PodStatus{
|
||||||
|
ID: id,
|
||||||
|
ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: kubecontainer.ContainerStateExited}},
|
||||||
|
}
|
||||||
|
event = &PodLifecycleEvent{ID: pod.ID, Type: ContainerDied, Data: container.ID.ID}
|
||||||
|
runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once()
|
||||||
|
runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once()
|
||||||
|
|
||||||
|
pleg.relist()
|
||||||
|
actualEvents = getEventsFromChannel(ch)
|
||||||
|
actualStatus, actualErr = pleg.cache.Get(pod.ID)
|
||||||
|
// Must copy status to compare since its pointer gets passed through all
|
||||||
|
// the way to the event
|
||||||
|
statusCopy := *status
|
||||||
|
statusCopy.IPs = tc.podIPs
|
||||||
|
assert.Equal(t, &statusCopy, actualStatus, tc.name)
|
||||||
|
assert.Nil(t, actualErr, tc.name)
|
||||||
|
assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents)
|
||||||
}
|
}
|
||||||
event := &PodLifecycleEvent{ID: pod.ID, Type: ContainerStarted, Data: container.ID.ID}
|
|
||||||
|
|
||||||
runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once()
|
|
||||||
runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once()
|
|
||||||
|
|
||||||
pleg.relist()
|
|
||||||
actualEvents := getEventsFromChannel(ch)
|
|
||||||
actualStatus, actualErr := pleg.cache.Get(pod.ID)
|
|
||||||
assert.Equal(t, status, actualStatus, "test0")
|
|
||||||
assert.Nil(t, actualErr, "test0")
|
|
||||||
assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents)
|
|
||||||
|
|
||||||
// Clear the IP address and mark the container terminated
|
|
||||||
container = createTestContainer("c0", kubecontainer.ContainerStateExited)
|
|
||||||
pod = &kubecontainer.Pod{
|
|
||||||
ID: id,
|
|
||||||
Containers: []*kubecontainer.Container{container},
|
|
||||||
}
|
|
||||||
status = &kubecontainer.PodStatus{
|
|
||||||
ID: id,
|
|
||||||
ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: kubecontainer.ContainerStateExited}},
|
|
||||||
}
|
|
||||||
event = &PodLifecycleEvent{ID: pod.ID, Type: ContainerDied, Data: container.ID.ID}
|
|
||||||
runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once()
|
|
||||||
runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once()
|
|
||||||
|
|
||||||
pleg.relist()
|
|
||||||
actualEvents = getEventsFromChannel(ch)
|
|
||||||
actualStatus, actualErr = pleg.cache.Get(pod.ID)
|
|
||||||
// Must copy status to compare since its pointer gets passed through all
|
|
||||||
// the way to the event
|
|
||||||
statusCopy := *status
|
|
||||||
statusCopy.IP = ipAddr
|
|
||||||
assert.Equal(t, &statusCopy, actualStatus, "test0")
|
|
||||||
assert.Nil(t, actualErr, "test0")
|
|
||||||
assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents)
|
|
||||||
}
|
}
|
||||||
|
@ -175,7 +175,7 @@ func PatchNodeCIDR(c clientset.Interface, node types.NodeName, cidr string) erro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PatchNodeCIDR patches the specified node's CIDR to the given value.
|
// PatchNodeCIDRs patches the specified node.CIDR=cidrs[0] and node.CIDRs to the given value.
|
||||||
func PatchNodeCIDRs(c clientset.Interface, node types.NodeName, cidrs []string) error {
|
func PatchNodeCIDRs(c clientset.Interface, node types.NodeName, cidrs []string) error {
|
||||||
rawCidrs, err := json.Marshal(cidrs)
|
rawCidrs, err := json.Marshal(cidrs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -189,7 +189,7 @@ func PatchNodeCIDRs(c clientset.Interface, node types.NodeName, cidrs []string)
|
|||||||
|
|
||||||
// set the pod cidrs list and set the old pod cidr field
|
// set the pod cidrs list and set the old pod cidr field
|
||||||
patchBytes := []byte(fmt.Sprintf(`{"spec":{"podCIDR":%s , "podCIDRs":%s}}`, rawCidr, rawCidrs))
|
patchBytes := []byte(fmt.Sprintf(`{"spec":{"podCIDR":%s , "podCIDRs":%s}}`, rawCidr, rawCidrs))
|
||||||
|
klog.V(4).Infof("cidrs patch bytes are:%s", string(patchBytes))
|
||||||
if _, err := c.CoreV1().Nodes().Patch(string(node), types.StrategicMergePatchType, patchBytes); err != nil {
|
if _, err := c.CoreV1().Nodes().Patch(string(node), types.StrategicMergePatchType, patchBytes); err != nil {
|
||||||
return fmt.Errorf("failed to patch node CIDR: %v", err)
|
return fmt.Errorf("failed to patch node CIDR: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -28,8 +28,8 @@ import (
|
|||||||
var (
|
var (
|
||||||
FakeVersion = "0.1.0"
|
FakeVersion = "0.1.0"
|
||||||
|
|
||||||
FakeRuntimeName = "fakeRuntime"
|
FakeRuntimeName = "fakeRuntime"
|
||||||
FakePodSandboxIP = "192.168.192.168"
|
FakePodSandboxIPs = []string{"192.168.192.168"}
|
||||||
)
|
)
|
||||||
|
|
||||||
type FakePodSandbox struct {
|
type FakePodSandbox struct {
|
||||||
@ -192,7 +192,7 @@ func (r *FakeRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig,
|
|||||||
State: runtimeapi.PodSandboxState_SANDBOX_READY,
|
State: runtimeapi.PodSandboxState_SANDBOX_READY,
|
||||||
CreatedAt: createdAt,
|
CreatedAt: createdAt,
|
||||||
Network: &runtimeapi.PodSandboxNetworkStatus{
|
Network: &runtimeapi.PodSandboxNetworkStatus{
|
||||||
Ip: FakePodSandboxIP,
|
Ip: FakePodSandboxIPs[0],
|
||||||
},
|
},
|
||||||
Labels: config.Labels,
|
Labels: config.Labels,
|
||||||
Annotations: config.Annotations,
|
Annotations: config.Annotations,
|
||||||
@ -200,7 +200,15 @@ func (r *FakeRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig,
|
|||||||
},
|
},
|
||||||
RuntimeHandler: runtimeHandler,
|
RuntimeHandler: runtimeHandler,
|
||||||
}
|
}
|
||||||
|
// assign additional IPs
|
||||||
|
additionalIPs := FakePodSandboxIPs[1:]
|
||||||
|
additionalPodIPs := make([]*runtimeapi.PodIP, 0, len(additionalIPs))
|
||||||
|
for _, ip := range additionalIPs {
|
||||||
|
additionalPodIPs = append(additionalPodIPs, &runtimeapi.PodIP{
|
||||||
|
Ip: ip,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
r.Sandboxes[podSandboxID].PodSandboxStatus.Network.AdditionalIps = additionalPodIPs
|
||||||
return podSandboxID, nil
|
return podSandboxID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user