diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 7b845c0b372..cc0ad3fe45e 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -118,6 +118,10 @@ type Runtime interface { ContainerAttacher // ImageService provides methods to image-related methods. ImageService + // UpdatePodCIDR sends a new podCIDR to the runtime. + // This method just proxies a new runtimeConfig with the updated + // CIDR value down to the runtime shim. + UpdatePodCIDR(podCIDR string) error } type ImageService interface { diff --git a/pkg/kubelet/container/testing/fake_runtime.go b/pkg/kubelet/container/testing/fake_runtime.go index bd2b29d91c7..1873512fa5f 100644 --- a/pkg/kubelet/container/testing/fake_runtime.go +++ b/pkg/kubelet/container/testing/fake_runtime.go @@ -118,6 +118,11 @@ func (f *FakeRuntime) ClearCalls() { f.StatusErr = nil } +// UpdatePodCIDR fulfills the cri interface. +func (f *FakeRuntime) UpdatePodCIDR(c string) error { + return nil +} + func (f *FakeRuntime) assertList(expect []string, test []string) error { if !reflect.DeepEqual(expect, test) { return fmt.Errorf("expected %#v, got %#v", expect, test) diff --git a/pkg/kubelet/container/testing/runtime_mock.go b/pkg/kubelet/container/testing/runtime_mock.go index a559ff0a317..b00ef98c681 100644 --- a/pkg/kubelet/container/testing/runtime_mock.go +++ b/pkg/kubelet/container/testing/runtime_mock.go @@ -153,3 +153,8 @@ func (r *Mock) ImageStats() (*ImageStats, error) { args := r.Called() return args.Get(0).(*ImageStats), args.Error(1) } + +// UpdatePodCIDR fulfills the cri interface. +func (r *Mock) UpdatePodCIDR(c string) error { + return nil +} diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index fcc0428a797..672e10c1326 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -27,11 +27,15 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", + "//pkg/apis/componentconfig:go_default_library", "//pkg/kubelet/api:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/dockertools:go_default_library", "//pkg/kubelet/leaky:go_default_library", + "//pkg/kubelet/network:go_default_library", + "//pkg/kubelet/network/cni:go_default_library", + "//pkg/kubelet/network/kubenet:go_default_library", "//pkg/kubelet/qos:go_default_library", "//pkg/kubelet/server/streaming:go_default_library", "//pkg/kubelet/types:go_default_library", @@ -63,12 +67,16 @@ go_test( deps = [ "//pkg/api:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library", + "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/dockertools:go_default_library", + "//pkg/kubelet/network:go_default_library", + "//pkg/kubelet/network/mock_network:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/security/apparmor:go_default_library", "//pkg/util/clock:go_default_library", "//vendor:github.com/docker/engine-api/types", + "//vendor:github.com/golang/mock/gomock", "//vendor:github.com/stretchr/testify/assert", ], ) diff --git a/pkg/kubelet/dockershim/docker_sandbox.go b/pkg/kubelet/dockershim/docker_sandbox.go index 14860b10043..8586068a342 100644 --- a/pkg/kubelet/dockershim/docker_sandbox.go +++ b/pkg/kubelet/dockershim/docker_sandbox.go @@ -25,6 +25,7 @@ import ( "github.com/golang/glog" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -37,6 +38,9 @@ const ( // Termination grace period defaultSandboxGracePeriod int = 10 + + // Name of the underlying container runtime + runtimeName = "docker" ) // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure @@ -72,12 +76,48 @@ func (ds *dockerService) RunPodSandbox(config *runtimeApi.PodSandboxConfig) (str // Assume kubelet's garbage collector would remove the sandbox later, if // startContainer failed. err = ds.client.StartContainer(createResp.ID) + if err != nil { + return createResp.ID, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.GetName(), err) + } + if config.GetLinux().GetNamespaceOptions().GetHostNetwork() { + return createResp.ID, nil + } + + // Step 4: Setup networking for the sandbox. + // All pod networking is setup by a CNI plugin discovered at startup time. + // This plugin assigns the pod ip, sets up routes inside the sandbox, + // creates interfaces etc. In theory, its jurisdiction ends with pod + // sandbox networking, but it might insert iptables rules or open ports + // on the host as well, to satisfy parts of the pod spec that aren't + // recognized by the CNI standard yet. + cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID) + err = ds.networkPlugin.SetUpPod(config.GetMetadata().GetNamespace(), config.GetMetadata().GetName(), cID) + // TODO: Do we need to teardown on failure or can we rely on a StopPodSandbox call with the given ID? return createResp.ID, err } // StopPodSandbox stops the sandbox. If there are any running containers in the // sandbox, they should be force terminated. +// TODO: This function blocks sandbox teardown on networking teardown. Is it +// better to cut our losses assuming an out of band GC routine will cleanup +// after us? func (ds *dockerService) StopPodSandbox(podSandboxID string) error { + status, err := ds.PodSandboxStatus(podSandboxID) + if err != nil { + return fmt.Errorf("Failed to get sandbox status: %v", err) + } + if !status.GetLinux().GetNamespaces().GetOptions().GetHostNetwork() { + m := status.GetMetadata() + cID := kubecontainer.BuildContainerID(runtimeName, podSandboxID) + if err := ds.networkPlugin.TearDownPod(m.GetNamespace(), m.GetName(), cID); err != nil { + // TODO: Figure out a way to retry this error. We can't + // right now because the plugin throws errors when it doesn't find + // eth0, which might not exist for various reasons (setup failed, + // conf changed etc). In theory, it should teardown everything else + // so there's no need to retry. + glog.Errorf("Failed to teardown sandbox %v for pod %v/%v: %v", m.GetNamespace(), m.GetName(), podSandboxID, err) + } + } return ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod) // TODO: Stop all running containers in the sandbox. } @@ -89,6 +129,53 @@ func (ds *dockerService) RemovePodSandbox(podSandboxID string) error { // TODO: remove all containers in the sandbox. } +// getIPFromPlugin interrogates the network plugin for an IP. +func (ds *dockerService) getIPFromPlugin(sandbox *dockertypes.ContainerJSON) (string, error) { + metadata, err := parseSandboxName(sandbox.Name) + if err != nil { + return "", err + } + msg := fmt.Sprintf("Couldn't find network status for %s/%s through plugin", *metadata.Namespace, *metadata.Name) + if sharesHostNetwork(sandbox) { + return "", fmt.Errorf("%v: not responsible for host-network sandboxes", msg) + } + cID := kubecontainer.BuildContainerID(runtimeName, sandbox.ID) + networkStatus, err := ds.networkPlugin.GetPodNetworkStatus(*metadata.Namespace, *metadata.Name, cID) + if err != nil { + // This might be a sandbox that somehow ended up without a default + // interface (eth0). We can't distinguish this from a more serious + // error, so callers should probably treat it as non-fatal. + return "", fmt.Errorf("%v: %v", msg, err) + } + if networkStatus == nil { + return "", fmt.Errorf("%v: invalid network status for", msg) + } + return networkStatus.IP.String(), nil +} + +// getIP returns the ip given the output of `docker inspect` on a pod sandbox, +// first interrogating any registered plugins, then simply trusting the ip +// in the sandbox itself. We look for an ipv4 address before ipv6. +func (ds *dockerService) getIP(sandbox *dockertypes.ContainerJSON) (string, error) { + if sandbox.NetworkSettings == nil { + return "", nil + } + if IP, err := ds.getIPFromPlugin(sandbox); err != nil { + glog.Warningf("%v", err) + } else if IP != "" { + return IP, nil + } + // TODO: trusting the docker ip is not a great idea. However docker uses + // eth0 by default and so does CNI, so if we find a docker IP here, we + // conclude that the plugin must have failed setup, or forgotten its ip. + // This is not a sensible assumption for plugins across the board, but if + // a plugin doesn't want this behavior, it can throw an error. + if sandbox.NetworkSettings.IPAddress != "" { + return sandbox.NetworkSettings.IPAddress, nil + } + return sandbox.NetworkSettings.GlobalIPv6Address, nil +} + // PodSandboxStatus returns the status of the PodSandbox. func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeApi.PodSandboxStatus, error) { // Inspect the container. @@ -109,20 +196,9 @@ func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeApi.PodS if r.State.Running { state = runtimeApi.PodSandBoxState_READY } - - // TODO: We can't really get the IP address from the network plugin, which - // is handled by kubelet as of now. Should we amend the interface? How is - // this handled in the new remote runtime integration? - // See DockerManager.determineContainerIP() for more details. - // For now, just assume that there is no network plugin. - // Related issue: https://github.com/kubernetes/kubernetes/issues/28667 - var IP string - if r.NetworkSettings != nil { - IP = r.NetworkSettings.IPAddress - // Fall back to IPv6 address if no IPv4 address is present - if IP == "" { - IP = r.NetworkSettings.GlobalIPv6Address - } + IP, err := ds.getIP(r) + if err != nil { + return nil, err } network := &runtimeApi.PodSandboxNetworkStatus{Ip: &IP} netNS := getNetworkNamespace(r) @@ -131,7 +207,7 @@ func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeApi.PodS if err != nil { return nil, err } - + hostNetwork := sharesHostNetwork(r) labels, annotations := extractLabels(r.Config.Labels) return &runtimeApi.PodSandboxStatus{ Id: &r.ID, @@ -141,7 +217,14 @@ func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeApi.PodS Labels: labels, Annotations: annotations, Network: network, - Linux: &runtimeApi.LinuxPodSandboxStatus{Namespaces: &runtimeApi.Namespace{Network: &netNS}}, + Linux: &runtimeApi.LinuxPodSandboxStatus{ + Namespaces: &runtimeApi.Namespace{ + Network: &netNS, + Options: &runtimeApi.NamespaceOption{ + HostNetwork: &hostNetwork, + }, + }, + }, }, nil } @@ -279,6 +362,15 @@ func (ds *dockerService) makeSandboxDockerConfig(c *runtimeApi.PodSandboxConfig, return createConfig, nil } +// sharesHostNetwork true if the given container is sharing the hosts's +// network namespace. +func sharesHostNetwork(container *dockertypes.ContainerJSON) bool { + if container != nil && container.HostConfig != nil { + return string(container.HostConfig.NetworkMode) == namespaceModeHost + } + return false +} + func setSandboxResources(hc *dockercontainer.HostConfig) { hc.Resources = dockercontainer.Resources{ MemorySwap: -1, // Always disable memory swap. diff --git a/pkg/kubelet/dockershim/docker_sandbox_test.go b/pkg/kubelet/dockershim/docker_sandbox_test.go index 9070098811f..baaadcd86bb 100644 --- a/pkg/kubelet/dockershim/docker_sandbox_test.go +++ b/pkg/kubelet/dockershim/docker_sandbox_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -99,12 +100,13 @@ func TestSandboxStatus(t *testing.T) { state := runtimeApi.PodSandBoxState_READY ct := int64(0) + hostNetwork := false expected := &runtimeApi.PodSandboxStatus{ State: &state, CreatedAt: &ct, Metadata: config.Metadata, Network: &runtimeApi.PodSandboxNetworkStatus{Ip: &fakeIP}, - Linux: &runtimeApi.LinuxPodSandboxStatus{Namespaces: &runtimeApi.Namespace{Network: &fakeNS}}, + Linux: &runtimeApi.LinuxPodSandboxStatus{Namespaces: &runtimeApi.Namespace{Network: &fakeNS, Options: &runtimeApi.NamespaceOption{HostNetwork: &hostNetwork}}}, Labels: labels, Annotations: annotations, } @@ -138,3 +140,57 @@ func TestSandboxStatus(t *testing.T) { status, err = ds.PodSandboxStatus(id) assert.Error(t, err, fmt.Sprintf("status of sandbox: %+v", status)) } + +// TestNetworkPluginInvocation checks that the right SetUpPod and TearDownPod +// calls are made when we run/stop a sandbox. +func TestNetworkPluginInvocation(t *testing.T) { + ds, _, _ := newTestDockerService() + mockPlugin := newTestNetworkPlugin(t) + ds.networkPlugin = mockPlugin + defer mockPlugin.Finish() + + name := "foo0" + ns := "bar0" + c := makeSandboxConfigWithLabelsAndAnnotations( + name, ns, "0", 0, + map[string]string{"label": name}, + map[string]string{"annotation": ns}, + ) + cID := kubecontainer.ContainerID{Type: runtimeName, ID: fmt.Sprintf("/%v", makeSandboxName(c))} + + setup := mockPlugin.EXPECT().SetUpPod(ns, name, cID) + // StopPodSandbox performs a lookup on status to figure out if the sandbox + // is running with hostnetworking, as all its given is the ID. + mockPlugin.EXPECT().GetPodNetworkStatus(ns, name, cID) + mockPlugin.EXPECT().TearDownPod(ns, name, cID).After(setup) + + _, err := ds.RunPodSandbox(c) + assert.NoError(t, err) + err = ds.StopPodSandbox(cID.ID) + assert.NoError(t, err) +} + +// TestHostNetworkPluginInvocation checks that *no* SetUp/TearDown calls happen +// for host network sandboxes. +func TestHostNetworkPluginInvocation(t *testing.T) { + ds, _, _ := newTestDockerService() + mockPlugin := newTestNetworkPlugin(t) + ds.networkPlugin = mockPlugin + defer mockPlugin.Finish() + + name := "foo0" + ns := "bar0" + c := makeSandboxConfigWithLabelsAndAnnotations( + name, ns, "0", 0, + map[string]string{"label": name}, + map[string]string{"annotation": ns}, + ) + hostNetwork := true + c.Linux = &runtimeApi.LinuxPodSandboxConfig{NamespaceOptions: &runtimeApi.NamespaceOption{HostNetwork: &hostNetwork}} + cID := kubecontainer.ContainerID{Type: runtimeName, ID: fmt.Sprintf("/%v", makeSandboxName(c))} + + // No calls to network plugin are expected + _, err := ds.RunPodSandbox(c) + assert.NoError(t, err) + assert.NoError(t, ds.StopPodSandbox(cID.ID)) +} diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index b37a714ec03..f96a3f32695 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -20,11 +20,16 @@ import ( "fmt" "io" + "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/componentconfig" internalApi "k8s.io/kubernetes/pkg/kubelet/api" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" + "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/network/cni" + "k8s.io/kubernetes/pkg/kubelet/network/kubenet" "k8s.io/kubernetes/pkg/kubelet/server/streaming" "k8s.io/kubernetes/pkg/util/term" ) @@ -53,10 +58,41 @@ const ( sandboxIDLabelKey = "io.kubernetes.sandbox.id" ) +// NetworkPluginArgs is the subset of kubelet runtime args we pass +// to the container runtime shim so it can probe for network plugins. +// In the future we will feed these directly to a standalone container +// runtime process. +type NetworkPluginSettings struct { + // HairpinMode is best described by comments surrounding the kubelet arg + HairpinMode componentconfig.HairpinMode + // NonMasqueradeCIDR is the range of ips which should *not* be included + // in any MASQUERADE rules applied by the plugin + NonMasqueradeCIDR string + // PluginName is the name of the plugin, runtime shim probes for + PluginName string + // PluginBinDir is the directory in which the binaries for the plugin with + // PluginName is kept. The admin is responsible for provisioning these + // binaries before-hand. + PluginBinDir string + // PluginConfDir is the directory in which the admin places a CNI conf. + // Depending on the plugin, this may be an optional field, eg: kubenet + // generates its own plugin conf. + PluginConfDir string + // MTU is the desired MTU for network devices created by the plugin. + MTU int + + // RuntimeHost is an interface that serves as a trap-door from plugin back + // into the kubelet. + // TODO: This shouldn't be required, remove once we move host ports into CNI + // and figure out bandwidth shaping. See corresponding comments above + // network.Host interface. + LegacyRuntimeHost network.LegacyHost +} + var internalLabelKeys []string = []string{containerTypeLabelKey, containerLogPathLabelKey, sandboxIDLabelKey} // NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process. -func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string, streamingConfig *streaming.Config) (DockerService, error) { +func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string, streamingConfig *streaming.Config, pluginSettings *NetworkPluginSettings) (DockerService, error) { ds := &dockerService{ seccompProfileRoot: seccompProfileRoot, client: dockertools.NewInstrumentedDockerInterface(client), @@ -76,6 +112,19 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str return nil, err } } + // dockershim currently only supports CNI plugins. + cniPlugins := cni.ProbeNetworkPlugins(pluginSettings.PluginConfDir, pluginSettings.PluginBinDir) + cniPlugins = append(cniPlugins, kubenet.NewPlugin(pluginSettings.PluginBinDir)) + netHost := &dockerNetworkHost{ + pluginSettings.LegacyRuntimeHost, + &namespaceGetter{ds}, + } + plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU) + if err != nil { + return nil, fmt.Errorf("didn't find compatible CNI plugin with given settings %+v: %v", pluginSettings, err) + } + ds.networkPlugin = plug + glog.Infof("Docker cri networking managed by %v", plug.Name()) return ds, nil } @@ -105,6 +154,7 @@ type dockerService struct { podSandboxImage string streamingRuntime *streamingRuntime streamingServer streaming.Server + networkPlugin network.NetworkPlugin } // Version returns the runtime name, runtime version and runtime API version @@ -126,6 +176,41 @@ func (ds *dockerService) Version(_ string) (*runtimeApi.VersionResponse, error) }, nil } -func (ds *dockerService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) error { - return nil +// UpdateRuntimeConfig updates the runtime config. Currently only handles podCIDR updates. +func (ds *dockerService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) (err error) { + if runtimeConfig == nil { + return + } + glog.Infof("docker cri received runtime config %+v", runtimeConfig) + if ds.networkPlugin != nil && runtimeConfig.NetworkConfig.PodCidr != nil { + event := make(map[string]interface{}) + event[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = *runtimeConfig.NetworkConfig.PodCidr + ds.networkPlugin.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, event) + } + return +} + +// namespaceGetter is a wrapper around the dockerService that implements +// the network.NamespaceGetter interface. +type namespaceGetter struct { + *dockerService +} + +// GetNetNS returns the network namespace of the given containerID. The ID +// supplied is typically the ID of a pod sandbox. This getter doesn't try +// to map non-sandbox IDs to their respective sandboxes. +func (ds *dockerService) GetNetNS(podSandboxID string) (string, error) { + r, err := ds.client.InspectContainer(podSandboxID) + if err != nil { + return "", err + } + return getNetworkNamespace(r), nil +} + +// dockerNetworkHost implements network.Host by wrapping the legacy host +// passed in by the kubelet and adding NamespaceGetter methods. The legacy +// host methods are slated for deletion. +type dockerNetworkHost struct { + network.LegacyHost + *namespaceGetter } diff --git a/pkg/kubelet/dockershim/docker_service_test.go b/pkg/kubelet/dockershim/docker_service_test.go index 2784f998482..3314c176fd3 100644 --- a/pkg/kubelet/dockershim/docker_service_test.go +++ b/pkg/kubelet/dockershim/docker_service_test.go @@ -17,15 +17,25 @@ limitations under the License. package dockershim import ( + "github.com/golang/mock/gomock" + "testing" "time" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/dockertools" + "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/network/mock_network" "k8s.io/kubernetes/pkg/util/clock" ) +// newTestNetworkPlugin returns a mock plugin that implements network.NetworkPlugin +func newTestNetworkPlugin(t *testing.T) *mock_network.MockNetworkPlugin { + ctrl := gomock.NewController(t) + return mock_network.NewMockNetworkPlugin(ctrl) +} + func newTestDockerService() (*dockerService, *dockertools.FakeDockerClient, *clock.FakeClock) { fakeClock := clock.NewFakeClock(time.Time{}) c := dockertools.NewFakeDockerClientWithClock(fakeClock) - return &dockerService{client: c, os: &containertest.FakeOS{}}, c, fakeClock + return &dockerService{client: c, os: &containertest.FakeOS{}, networkPlugin: &network.NoopNetworkPlugin{}}, c, fakeClock } diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 4640ab91cc7..b4e7dc667d1 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -1321,6 +1321,12 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream return PortForward(dm.client, podInfraContainer.ID.ID, port, stream) } +// UpdatePodCIDR updates the podCIDR for the runtime. +// Currently no-ops, just implemented to satisfy the cri. +func (dm *DockerManager) UpdatePodCIDR(podCIDR string) error { + return nil +} + // Temporarily export this function to share with dockershim. func PortForward(client DockerInterface, podInfraContainerID string, port uint16, stream io.ReadWriteCloser) error { container, err := client.InspectContainer(podInfraContainerID) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 372e8577ddd..4751a8b8996 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -456,7 +456,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub } glog.Infof("Hairpin mode set to %q", klet.hairpinMode) - if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &networkHost{klet}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil { + if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &criNetworkHost{&networkHost{klet}}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil { return nil, err } else { klet.networkPlugin = plug @@ -482,6 +482,26 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub } } + // TODO: These need to become arguments to a standalone docker shim. + binDir := kubeCfg.CNIBinDir + if binDir == "" { + binDir = kubeCfg.NetworkPluginDir + } + pluginSettings := dockershim.NetworkPluginSettings{ + HairpinMode: klet.hairpinMode, + NonMasqueradeCIDR: klet.nonMasqueradeCIDR, + PluginName: kubeCfg.NetworkPluginName, + PluginConfDir: kubeCfg.CNIConfDir, + PluginBinDir: binDir, + MTU: int(kubeCfg.NetworkPluginMTU), + } + + // Remote runtime shim just cannot talk back to kubelet, so it doesn't + // support bandwidth shaping or hostports till #35457. To enable legacy + // features, replace with networkHost. + var nl *noOpLegacyHost + pluginSettings.LegacyRuntimeHost = nl + // Initialize the runtime. switch kubeCfg.ContainerRuntime { case "docker": @@ -489,10 +509,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub case "cri": // Use the new CRI shim for docker. This is needed for testing the // docker integration through CRI, and may be removed in the future. - dockerService, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil) - if err != nil { - return nil, err - } + dockerService, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil, &pluginSettings) runtimeService := dockerService.(internalApi.RuntimeService) imageService := dockerService.(internalApi.ImageManagerService) @@ -520,6 +537,13 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub return nil, err } } + + // kubelet defers to the runtime shim to setup networking. Setting + // this to nil will prevent it from trying to invoke the plugin. + // It's easier to always probe and initialize plugins till cri + // becomes the default. + klet.networkPlugin = nil + klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager( kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, @@ -1202,6 +1226,13 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { kl.syncLoop(updates, kl) } +// GetKubeClient returns the Kubernetes client. +// TODO: This is currently only required by network plugins. Replace +// with more specific methods. +func (kl *Kubelet) GetKubeClient() clientset.Interface { + return kl.kubeClient +} + // GetClusterDNS returns a list of the DNS servers and a list of the DNS search // domains of the cluster. func (kl *Kubelet) GetClusterDNS(pod *api.Pod) ([]string, []string, error) { diff --git a/pkg/kubelet/kubelet_network.go b/pkg/kubelet/kubelet_network.go index 753630a1017..af6cb86940a 100644 --- a/pkg/kubelet/kubelet_network.go +++ b/pkg/kubelet/kubelet_network.go @@ -191,7 +191,20 @@ func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error { // syncNetworkStatus updates the network state func (kl *Kubelet) syncNetworkStatus() { - kl.runtimeState.setNetworkState(kl.networkPlugin.Status()) + // TODO(#35701): cri shim handles network plugin but we currently + // don't have a cri status hook, so network plugin status isn't + // reported if --experimental-runtime-integration=cri. This isn't + // too bad, because kubenet is the only network plugin that + // implements status(), and it just checks for plugin binaries + // on the filesystem. + if kl.networkPlugin != nil { + kl.runtimeState.setNetworkState(kl.networkPlugin.Status()) + } else if kl.runtimeState.podCIDR() != "" { + // Don't mark the node ready till we've successfully executed + // the first UpdatePodCIDR call through cri. See comment above + // setPodCIDR call. + kl.runtimeState.setNetworkState(nil) + } } // updatePodCIDR updates the pod CIDR in the runtime state if it is different @@ -203,14 +216,26 @@ func (kl *Kubelet) updatePodCIDR(cidr string) { return } - glog.Infof("Setting Pod CIDR: %v -> %v", podCIDR, cidr) - kl.runtimeState.setPodCIDR(cidr) - + // kubelet -> network plugin + // cri runtime shims are responsible for their own network plugins if kl.networkPlugin != nil { details := make(map[string]interface{}) details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = cidr kl.networkPlugin.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details) } + + // kubelet -> generic runtime -> runtime shim -> network plugin + // docker/rkt non-cri implementations have a passthrough UpdatePodCIDR + if err := kl.GetRuntime().UpdatePodCIDR(cidr); err != nil { + glog.Errorf("Failed to update pod CIDR: %v", err) + return + } + + // We need to be careful about setting podCIDR. Till #35839 lands we're + // using it to indicate network plugin status for cri shims. See comment + // in syncNetworkStatus. + glog.Infof("Setting Pod CIDR: %v -> %v", podCIDR, cidr) + kl.runtimeState.setPodCIDR(cidr) } // shapingEnabled returns whether traffic shaping is enabled. @@ -219,6 +244,16 @@ func (kl *Kubelet) shapingEnabled() bool { if kl.networkPlugin != nil && kl.networkPlugin.Capabilities().Has(network.NET_PLUGIN_CAPABILITY_SHAPING) { return false } + // This is not strictly true but we need to figure out how to handle + // bandwidth shaping anyway. If the kubelet doesn't have a networkPlugin, + // it could mean: + // a. the kubelet is responsible for bandwidth shaping + // b. the kubelet is using cri, and the cri has a network plugin + // Today, the only plugin that understands bandwidth shaping is kubenet, and + // it doesn't support bandwidth shaping when invoked through cri, so it + // effectively boils down to letting the kubelet decide how to handle + // shaping annotations. The combination of (cri + network plugin that + // handles bandwidth shaping) may not work because of this. return true } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 51f4c3e4baf..b0f4378cc4a 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -641,41 +641,16 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *api.Pod, _ api.PodStatus, podSt return } - setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, podSandboxID) - result.AddSyncResult(setupNetworkResult) - if !kubecontainer.IsHostNetworkPod(pod) { - glog.V(3).Infof("Calling network plugin %s to setup pod for %s", m.networkPlugin.Name(), format.Pod(pod)) - // Setup pod network plugin with sandbox id - // TODO: rename the last param to sandboxID - err = m.networkPlugin.SetUpPod(pod.Namespace, pod.Name, kubecontainer.ContainerID{ - Type: m.runtimeName, - ID: podSandboxID, - }) - if err != nil { - message := fmt.Sprintf("Failed to setup network for pod %q using network plugins %q: %v", format.Pod(pod), m.networkPlugin.Name(), err) - setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, message) - glog.Error(message) - - killPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, format.Pod(pod)) - result.AddSyncResult(killPodSandboxResult) - if err := m.runtimeService.StopPodSandbox(podSandboxID); err != nil { - killPodSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error()) - glog.Errorf("Kill sandbox %q failed for pod %q: %v", podSandboxID, format.Pod(pod), err) - } - return - } - - podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID) - if err != nil { - glog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod)) - result.Fail(err) - return - } - - // Overwrite the podIP passed in the pod status, since we just started the infra container. - podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus) - glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod)) + podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID) + if err != nil { + glog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod)) + result.Fail(err) + return } + + // Overwrite the podIP passed in the pod status, since we just started the pod sandbox. + podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus) + glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod)) } // Get podSandboxConfig for containers to start. @@ -815,33 +790,6 @@ func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *api.Pod, runningP result.AddSyncResult(containerResult) } - // Teardown network plugin - if len(runningPod.Sandboxes) == 0 { - glog.V(4).Infof("Can not find pod sandbox by UID %q, assuming already removed.", runningPod.ID) - return - } - - sandboxID := runningPod.Sandboxes[0].ID.ID - isHostNetwork, err := m.isHostNetwork(sandboxID, pod) - if err != nil { - result.Fail(err) - return - } - if !isHostNetwork { - teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, runningPod.ID) - result.AddSyncResult(teardownNetworkResult) - // Tear down network plugin with sandbox id - if err := m.networkPlugin.TearDownPod(runningPod.Namespace, runningPod.Name, kubecontainer.ContainerID{ - Type: m.runtimeName, - ID: sandboxID, - }); err != nil { - message := fmt.Sprintf("Failed to teardown network for pod %s_%s(%s) using network plugins %q: %v", - runningPod.Name, runningPod.Namespace, runningPod.ID, m.networkPlugin.Name(), err) - teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, message) - glog.Error(message) - } - } - // stop sandbox, the sandbox will be removed in GarbageCollect killSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, runningPod.ID) result.AddSyncResult(killSandboxResult) @@ -1006,3 +954,17 @@ func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uin return fmt.Errorf("not implemented") } + +// UpdatePodCIDR is just a passthrough method to update the runtimeConfig of the shim +// with the podCIDR supplied by the kubelet. +func (m *kubeGenericRuntimeManager) UpdatePodCIDR(podCIDR string) error { + // TODO(#35531): do we really want to write a method on this manager for each + // field of the config? + glog.Infof("updating runtime config through cri with podcidr %v", podCIDR) + return m.runtimeService.UpdateRuntimeConfig( + &runtimeApi.RuntimeConfig{ + NetworkConfig: &runtimeApi.NetworkConfig{ + PodCidr: &podCIDR, + }, + }) +} diff --git a/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go index e10cf156a21..a2d5232025e 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go @@ -18,13 +18,13 @@ package kuberuntime import ( "fmt" + "net" "sort" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" ) @@ -180,27 +180,16 @@ func (m *kubeGenericRuntimeManager) getKubeletSandboxes(all bool) ([]*runtimeApi } // determinePodSandboxIP determines the IP address of the given pod sandbox. -// TODO: remove determinePodSandboxIP after networking is delegated to the container runtime. func (m *kubeGenericRuntimeManager) determinePodSandboxIP(podNamespace, podName string, podSandbox *runtimeApi.PodSandboxStatus) string { - ip := "" - - if podSandbox.Network != nil { - ip = podSandbox.Network.GetIp() + if podSandbox.Network == nil { + glog.Warningf("Pod Sandbox status doesn't have network information, cannot report IP") + return "" } - - if m.networkPlugin.Name() != network.DefaultPluginName { - // TODO: podInfraContainerID in GetPodNetworkStatus() interface should be renamed to sandboxID - netStatus, err := m.networkPlugin.GetPodNetworkStatus(podNamespace, podName, kubecontainer.ContainerID{ - Type: m.runtimeName, - ID: podSandbox.GetId(), - }) - if err != nil { - glog.Errorf("NetworkPlugin %s failed on the status hook for pod '%s' - %v", m.networkPlugin.Name(), kubecontainer.BuildPodFullName(podName, podNamespace), err) - } else if netStatus != nil { - ip = netStatus.IP.String() - } + ip := podSandbox.Network.GetIp() + if net.ParseIP(ip) == nil { + glog.Warningf("Pod Sandbox reported an unparseable IP %v", ip) + return "" } - return ip } diff --git a/pkg/kubelet/network/cni/cni.go b/pkg/kubelet/network/cni/cni.go index 67f1c47404a..69667df419e 100644 --- a/pkg/kubelet/network/cni/cni.go +++ b/pkg/kubelet/network/cni/cni.go @@ -193,7 +193,7 @@ func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubec if err := plugin.checkInitialized(); err != nil { return err } - netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) + netnsPath, err := plugin.host.GetNetNS(id.ID) if err != nil { return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err) } @@ -217,7 +217,7 @@ func (plugin *cniNetworkPlugin) TearDownPod(namespace string, name string, id ku if err := plugin.checkInitialized(); err != nil { return err } - netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) + netnsPath, err := plugin.host.GetNetNS(id.ID) if err != nil { return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err) } @@ -228,7 +228,7 @@ func (plugin *cniNetworkPlugin) TearDownPod(namespace string, name string, id ku // TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin. // Also fix the runtime's call to Status function to be done only in the case that the IP is lost, no need to do periodic calls func (plugin *cniNetworkPlugin) GetPodNetworkStatus(namespace string, name string, id kubecontainer.ContainerID) (*network.PodNetworkStatus, error) { - netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) + netnsPath, err := plugin.host.GetNetNS(id.ID) if err != nil { return nil, fmt.Errorf("CNI failed to retrieve network namespace path: %v", err) } diff --git a/pkg/kubelet/network/cni/cni_test.go b/pkg/kubelet/network/cni/cni_test.go index 41fd2786082..8aeed7a93fd 100644 --- a/pkg/kubelet/network/cni/cni_test.go +++ b/pkg/kubelet/network/cni/cni_test.go @@ -138,6 +138,14 @@ func (fnh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime { return fnh.runtime } +func (fnh *fakeNetworkHost) GetNetNS(containerID string) (string, error) { + return fnh.GetRuntime().GetNetNS(kubecontainer.ContainerID{Type: "test", ID: containerID}) +} + +func (fnh *fakeNetworkHost) SupportsLegacyFeatures() bool { + return true +} + func TestCNIPlugin(t *testing.T) { // install some random plugin pluginName := fmt.Sprintf("test%d", rand.Intn(1000)) diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index 717aefc8e62..7679dce6d76 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -334,6 +334,9 @@ func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int { return utilsets.NewInt(network.NET_PLUGIN_CAPABILITY_SHAPING) } +// setup sets up networking through CNI using the given ns/name and sandbox ID. +// TODO: Don't pass the pod to this method, it only needs it for bandwidth +// shaping and hostport management. func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, pod *api.Pod) error { // Bring up container loopback interface if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil { @@ -384,6 +387,14 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube plugin.syncEbtablesDedupRules(macAddr) } + plugin.podIPs[id] = ip4.String() + + // The host can choose to not support "legacy" features. The remote + // shim doesn't support it (#35457), but the kubelet does. + if !plugin.host.SupportsLegacyFeatures() { + return nil + } + // The first SetUpPod call creates the bridge; get a shaper for the sake of // initialization shaper := plugin.shaper() @@ -398,8 +409,6 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube } } - plugin.podIPs[id] = ip4.String() - // Open any hostports the pod's containers want activePods, err := plugin.getActivePods() if err != nil { @@ -423,6 +432,7 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k glog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name) }() + // TODO: Entire pod object only required for bw shaping and hostport. pod, ok := plugin.host.GetPodByName(namespace, name) if !ok { return fmt.Errorf("pod %q cannot be found", name) @@ -440,15 +450,20 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k glog.V(4).Infof("Failed to clean up %s/%s after SetUpPod failure: %v", namespace, name, err) } - // TODO: Remove this hack once we've figured out how to retrieve the netns - // of an exited container. Currently, restarting docker will leak a bunch of - // ips. This will exhaust available ip space unless we cleanup old ips. At the - // same time we don't want to try GC'ing them periodically as that could lead - // to a performance regression in starting pods. So on each setup failure, try - // GC on the assumption that the kubelet is going to retry pod creation, and - // when it does, there will be ips. - plugin.ipamGarbageCollection() + // TODO(#34278): Figure out if we need IP GC through the cri. + // The cri should always send us teardown events for stale sandboxes, + // this obviates the need for GC in the common case, for kubenet. + if plugin.host.SupportsLegacyFeatures() { + // TODO: Remove this hack once we've figured out how to retrieve the netns + // of an exited container. Currently, restarting docker will leak a bunch of + // ips. This will exhaust available ip space unless we cleanup old ips. At the + // same time we don't want to try GC'ing them periodically as that could lead + // to a performance regression in starting pods. So on each setup failure, try + // GC on the assumption that the kubelet is going to retry pod creation, and + // when it does, there will be ips. + plugin.ipamGarbageCollection() + } return err } @@ -485,6 +500,12 @@ func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id k } } + // The host can choose to not support "legacy" features. The remote + // shim doesn't support it (#35457), but the kubelet does. + if !plugin.host.SupportsLegacyFeatures() { + return utilerrors.NewAggregate(errList) + } + activePods, err := plugin.getActivePods() if err == nil { err = plugin.hostportHandler.SyncHostports(BridgeName, activePods) @@ -533,7 +554,7 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s return &network.PodNetworkStatus{IP: net.ParseIP(podIP)}, nil } - netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) + netnsPath, err := plugin.host.GetNetNS(id.ID) if err != nil { return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err) } @@ -722,7 +743,7 @@ func podIsExited(p *kubecontainer.Pod) bool { } func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubecontainer.ContainerID) (*libcni.RuntimeConf, error) { - netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) + netnsPath, err := plugin.host.GetNetNS(id.ID) if err != nil { return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err) } diff --git a/pkg/kubelet/network/kubenet/kubenet_linux_test.go b/pkg/kubelet/network/kubenet/kubenet_linux_test.go index e3d129b06ce..08cd7d5a908 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux_test.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux_test.go @@ -229,4 +229,40 @@ func TestGenerateMacAddress(t *testing.T) { } } +// TestInvocationWithoutRuntime invokes the plugin without a runtime. +// This is how kubenet is invoked from the cri. +func TestTearDownWithoutRuntime(t *testing.T) { + fhost := nettest.NewFakeHost(nil) + fhost.Legacy = false + fhost.Runtime = nil + mockcni := &mock_cni.MockCNI{} + + fexec := &exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{}, + LookPathFunc: func(file string) (string, error) { + return fmt.Sprintf("/fake-bin/%s", file), nil + }, + } + + kubenet := newFakeKubenetPlugin(map[kubecontainer.ContainerID]string{}, fexec, fhost) + kubenet.cniConfig = mockcni + kubenet.iptables = ipttest.NewFake() + + details := make(map[string]interface{}) + details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = "10.0.0.1/24" + kubenet.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details) + + existingContainerID := kubecontainer.BuildContainerID("docker", "123") + kubenet.podIPs[existingContainerID] = "10.0.0.1" + + mockcni.On("DelNetwork", mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil) + + if err := kubenet.TearDownPod("namespace", "name", existingContainerID); err != nil { + t.Fatalf("Unexpected error in TearDownPod: %v", err) + } + // Assert that the CNI DelNetwork made it through and we didn't crash + // without a runtime. + mockcni.AssertExpectations(t) +} + //TODO: add unit test for each implementation of network plugin interface diff --git a/pkg/kubelet/network/mock_network/network_plugins.go b/pkg/kubelet/network/mock_network/network_plugins.go index 71e50530d4d..a56bf252313 100644 --- a/pkg/kubelet/network/mock_network/network_plugins.go +++ b/pkg/kubelet/network/mock_network/network_plugins.go @@ -55,6 +55,10 @@ func (_m *MockNetworkPlugin) Capabilities() sets.Int { return ret0 } +func (_m *MockNetworkPlugin) Finish() { + _m.ctrl.Finish() +} + func (_mr *_MockNetworkPluginRecorder) Capabilities() *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "Capabilities") } diff --git a/pkg/kubelet/network/plugins.go b/pkg/kubelet/network/plugins.go index c5e6ae26c28..9e7a1b65c26 100644 --- a/pkg/kubelet/network/plugins.go +++ b/pkg/kubelet/network/plugins.go @@ -95,16 +95,53 @@ type PodNetworkStatus struct { IP net.IP `json:"ip" description:"Primary IP address of the pod"` } -// Host is an interface that plugins can use to access the kubelet. -type Host interface { +// LegacyHost implements the methods required by network plugins that +// were directly invoked by the kubelet. Implementations of this interface +// that do not wish to support these features can simply return false +// to SupportsLegacyFeatures. +type LegacyHost interface { // Get the pod structure by its name, namespace + // Only used for hostport management and bw shaping GetPodByName(namespace, name string) (*api.Pod, bool) // GetKubeClient returns a client interface + // Only used in testing GetKubeClient() clientset.Interface // GetContainerRuntime returns the container runtime that implements the containers (e.g. docker/rkt) + // Only used for hostport management GetRuntime() kubecontainer.Runtime + + // SupportsLegacyFeaturs returns true if this host can support hostports + // and bandwidth shaping. Both will either get added to CNI or dropped, + // so differnt implementations can choose to ignore them. + SupportsLegacyFeatures() bool +} + +// Host is an interface that plugins can use to access the kubelet. +// TODO(#35457): get rid of this backchannel to the kubelet. The scope of +// the back channel is restricted to host-ports/testing, and restricted +// to kubenet. No other network plugin wrapper needs it. Other plugins +// only require a way to access namespace information, which they can do +// directly through the embedded NamespaceGetter. +type Host interface { + // NamespaceGetter is a getter for sandbox namespace information. + // It's the only part of this interface that isn't currently deprecated. + NamespaceGetter + + // LegacyHost contains methods that trap back into the Kubelet. Dependence + // *do not* add more dependencies in this interface. In a post-cri world, + // network plugins will be invoked by the runtime shim, and should only + // require NamespaceGetter. + LegacyHost +} + +// NamespaceGetter is an interface to retrieve namespace information for a given +// sandboxID. Typically implemented by runtime shims that are closely coupled to +// CNI plugin wrappers like kubenet. +type NamespaceGetter interface { + // GetNetNS returns network namespace information for the given containerID. + GetNetNS(containerID string) (string, error) } // InitNetworkPlugin inits the plugin that matches networkPluginName. Plugins must have unique names. diff --git a/pkg/kubelet/network/testing/fake_host.go b/pkg/kubelet/network/testing/fake_host.go index 3eb1d1672fc..a650be1c2e1 100644 --- a/pkg/kubelet/network/testing/fake_host.go +++ b/pkg/kubelet/network/testing/fake_host.go @@ -27,11 +27,14 @@ import ( ) type fakeNetworkHost struct { + fakeNamespaceGetter kubeClient clientset.Interface + Legacy bool + Runtime *containertest.FakeRuntime } func NewFakeHost(kubeClient clientset.Interface) *fakeNetworkHost { - host := &fakeNetworkHost{kubeClient: kubeClient} + host := &fakeNetworkHost{kubeClient: kubeClient, Legacy: true, Runtime: &containertest.FakeRuntime{}} return host } @@ -44,5 +47,17 @@ func (fnh *fakeNetworkHost) GetKubeClient() clientset.Interface { } func (nh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime { - return &containertest.FakeRuntime{} + return nh.Runtime +} + +func (nh *fakeNetworkHost) SupportsLegacyFeatures() bool { + return nh.Legacy +} + +type fakeNamespaceGetter struct { + ns string +} + +func (nh *fakeNamespaceGetter) GetNetNS(containerID string) (string, error) { + return nh.ns, nil } diff --git a/pkg/kubelet/networks.go b/pkg/kubelet/networks.go index 9ab4a137e6e..74cfcb6427c 100644 --- a/pkg/kubelet/networks.go +++ b/pkg/kubelet/networks.go @@ -24,6 +24,11 @@ import ( // This just exports required functions from kubelet proper, for use by network // plugins. +// TODO(#35457): get rid of this backchannel to the kubelet. The scope of +// the back channel is restricted to host-ports/testing, and restricted +// to kubenet. No other network plugin wrapper needs it. Other plugins +// only require a way to access namespace information, which they can do +// directly through the methods implemented by criNetworkHost. type networkHost struct { kubelet *Kubelet } @@ -39,3 +44,45 @@ func (nh *networkHost) GetKubeClient() clientset.Interface { func (nh *networkHost) GetRuntime() kubecontainer.Runtime { return nh.kubelet.GetRuntime() } + +func (nh *networkHost) SupportsLegacyFeatures() bool { + return true +} + +// criNetworkHost implements the part of network.Host required by the +// cri (NamespaceGetter). It leechs off networkHost for all other +// methods, because networkHost is slated for deletion. +type criNetworkHost struct { + *networkHost +} + +// GetNetNS returns the network namespace of the given containerID. +// This method satisfies the network.NamespaceGetter interface for +// networkHost. It's only meant to be used from network plugins +// that are directly invoked by the kubelet (aka: legacy, pre-cri). +// Any network plugin invoked by a cri must implement NamespaceGetter +// to talk directly to the runtime instead. +func (c *criNetworkHost) GetNetNS(containerID string) (string, error) { + return c.kubelet.GetRuntime().GetNetNS(kubecontainer.ContainerID{Type: "", ID: containerID}) +} + +// noOpLegacyHost implements the network.LegacyHost interface for the remote +// runtime shim by just returning empties. It doesn't support legacy features +// like host port and bandwidth shaping. +type noOpLegacyHost struct{} + +func (n *noOpLegacyHost) GetPodByName(namespace, name string) (*api.Pod, bool) { + return nil, true +} + +func (n *noOpLegacyHost) GetKubeClient() clientset.Interface { + return nil +} + +func (n *noOpLegacyHost) GetRuntime() kubecontainer.Runtime { + return nil +} + +func (nh *noOpLegacyHost) SupportsLegacyFeatures() bool { + return false +} diff --git a/pkg/kubelet/remote/remote_runtime.go b/pkg/kubelet/remote/remote_runtime.go index c87c1284516..17aa3d0f49c 100644 --- a/pkg/kubelet/remote/remote_runtime.go +++ b/pkg/kubelet/remote/remote_runtime.go @@ -36,7 +36,7 @@ type RemoteRuntimeService struct { // NewRemoteRuntimeService creates a new internalApi.RuntimeService. func NewRemoteRuntimeService(addr string, connectionTimout time.Duration) (internalApi.RuntimeService, error) { - glog.V(3).Infof("Connecting to runtime service %s", addr) + glog.Infof("Connecting to runtime service %s", addr) conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(connectionTimout), grpc.WithDialer(dial)) if err != nil { glog.Errorf("Connect remote runtime %s failed: %v", addr, err) @@ -319,6 +319,23 @@ func (r *RemoteRuntimeService) PortForward(req *runtimeApi.PortForwardRequest) ( return resp, nil } +// UpdateRuntimeConfig updates the config of a runtime service. The only +// update payload currently supported is the pod CIDR assigned to a node, +// and the runtime service just proxies it down to the network plugin. func (r *RemoteRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) error { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + // Response doesn't contain anything of interest. This translates to an + // Event notification to the network plugin, which can't fail, so we're + // really looking to surface destination unreachable. + _, err := r.runtimeClient.UpdateRuntimeConfig(ctx, &runtimeApi.UpdateRuntimeConfigRequest{ + RuntimeConfig: runtimeConfig, + }) + + if err != nil { + return err + } + return nil } diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 16cb0db6b8e..073c91af407 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -2179,6 +2179,12 @@ func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.Rea return command.Run() } +// UpdatePodCIDR updates the runtimeconfig with the podCIDR. +// Currently no-ops, just implemented to satisfy the cri. +func (r *Runtime) UpdatePodCIDR(podCIDR string) error { + return nil +} + // appStateToContainerState converts rktapi.AppState to kubecontainer.ContainerState. func appStateToContainerState(state rktapi.AppState) kubecontainer.ContainerState { switch state {