Merge pull request #45280 from JulienBalestra/run-pod-inside-unique-netns

Automatic merge from submit-queue

rkt: Generate a new Network Namespace for each Pod

**What this PR does / why we need it**:

This PR concerns the Kubelet with the Container runtime rkt.
Currently, when a Pod stops and the kubelet restart it, the Pod will use the **same network namespace** based on its PodID.

When the Garbage Collection is triggered, it delete all the old resources and the current network namespace.

The Pods and all containers inside it loose the _eth0_ interface.
I explained more in details in #45149 how to reproduce this behavior.

This PR generates a new unique network namespace name for each new/restarting Pod.
The Garbage collection retrieve the correct network namespace and remove it safely.

**Which issue this PR fixes** : 

fix #45149 

**Special notes for your reviewer**:

Following @yifan-gu guidelines, so maybe expecting him for the final review.

**Release note**:

`NONE`
This commit is contained in:
Kubernetes Submit Queue 2017-05-09 15:07:34 -07:00 committed by GitHub
commit 76889118d7
3 changed files with 247 additions and 98 deletions

View File

@ -27,7 +27,9 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
kubetypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
) )
// fakeRktInterface mocks the rktapi.PublicAPIClient interface for testing purpose. // fakeRktInterface mocks the rktapi.PublicAPIClient interface for testing purpose.
@ -189,3 +191,17 @@ func (f fakePodGetter) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
p, found := f.pods[uid] p, found := f.pods[uid]
return p, found return p, found
} }
type fakeNetNs struct {
networkNamespace kubecontainer.ContainerID
}
func newFakeNetNs() *fakeNetNs {
return &fakeNetNs{
networkNamespace: kubecontainer.ContainerID{},
}
}
func (f *fakeNetNs) fromRunningUnitFiles(uid kubetypes.UID, latestPod *rktapi.Pod) (kubecontainer.ContainerID, error) {
return kubecontainer.ContainerID{ID: "42"}, nil
}

View File

@ -87,6 +87,7 @@ const (
unitPodName = "PodName" unitPodName = "PodName"
unitPodNamespace = "PodNamespace" unitPodNamespace = "PodNamespace"
unitPodHostNetwork = "PodHostNetwork" unitPodHostNetwork = "PodHostNetwork"
unitPodNetworkNamespace = "PodNetworkNamespace"
k8sRktKubeletAnno = "rkt.kubernetes.io/managed-by-kubelet" k8sRktKubeletAnno = "rkt.kubernetes.io/managed-by-kubelet"
k8sRktKubeletAnnoValue = "true" k8sRktKubeletAnnoValue = "true"
@ -182,6 +183,17 @@ type Runtime struct {
// requestTimeout is the timeout of rkt requests. // requestTimeout is the timeout of rkt requests.
requestTimeout time.Duration requestTimeout time.Duration
netns netNsGetter
}
// Field of the X-Kubernetes directive of a systemd service file
type podServiceDirective struct {
id string
name string
namespace string
hostNetwork bool
networkNamespace kubecontainer.ContainerID
} }
var _ kubecontainer.Runtime = &Runtime{} var _ kubecontainer.Runtime = &Runtime{}
@ -199,6 +211,11 @@ type cliInterface interface {
RunCommand(config *Config, args ...string) (result []string, err error) RunCommand(config *Config, args ...string) (result []string, err error)
} }
// netNsGetter wrapps the systemd open files for testing purpose
type netNsGetter interface {
fromRunningUnitFiles(kubetypes.UID, *rktapi.Pod) (kubecontainer.ContainerID, error)
}
// New creates the rkt container runtime which implements the container runtime interface. // New creates the rkt container runtime which implements the container runtime interface.
// It will test if the rkt binary is in the $PATH, and whether we can get the // It will test if the rkt binary is in the $PATH, and whether we can get the
// version of it. If so, creates the rkt container runtime, otherwise returns an error. // version of it. If so, creates the rkt container runtime, otherwise returns an error.
@ -291,6 +308,7 @@ func New(
} }
rkt.cli = rkt rkt.cli = rkt
rkt.netns = rkt
return rkt, nil return rkt, nil
} }
@ -960,7 +978,7 @@ func (r *Runtime) usesRktHostNetwork(pod *v1.Pod) bool {
} }
// generateRunCommand crafts a 'rkt run-prepared' command with necessary parameters. // generateRunCommand crafts a 'rkt run-prepared' command with necessary parameters.
func (r *Runtime) generateRunCommand(pod *v1.Pod, uuid, netnsName string) (string, error) { func (r *Runtime) generateRunCommand(pod *v1.Pod, uuid, networkNamespaceID string) (string, error) {
config := *r.config config := *r.config
privileged := true privileged := true
@ -1042,14 +1060,14 @@ func (r *Runtime) generateRunCommand(pod *v1.Pod, uuid, netnsName string) (strin
// TODO: switch to 'ip netns exec' once we can depend on a new // TODO: switch to 'ip netns exec' once we can depend on a new
// enough version that doesn't have bugs like // enough version that doesn't have bugs like
// https://bugzilla.redhat.com/show_bug.cgi?id=882047 // https://bugzilla.redhat.com/show_bug.cgi?id=882047
nsenterExec := []string{r.nsenterPath, "--net=" + netnsPathFromName(netnsName), "--"} nsenterExec := []string{r.nsenterPath, "--net=" + netnsPathFromName(networkNamespaceID), "--"}
runPrepared = append(nsenterExec, runPrepared...) runPrepared = append(nsenterExec, runPrepared...)
} }
return strings.Join(runPrepared, " "), nil return strings.Join(runPrepared, " "), nil
} }
func (r *Runtime) cleanupPodNetwork(pod *v1.Pod) error { func (r *Runtime) cleanupPodNetwork(pod *v1.Pod, networkNamespace kubecontainer.ContainerID) error {
glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.network.PluginName(), format.Pod(pod)) glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.network.PluginName(), format.Pod(pod))
// No-op if the pod is not running in a created netns. // No-op if the pod is not running in a created netns.
@ -1057,13 +1075,12 @@ func (r *Runtime) cleanupPodNetwork(pod *v1.Pod) error {
return nil return nil
} }
containerID := kubecontainer.ContainerID{ID: string(pod.UID)} teardownErr := r.network.TearDownPod(pod.Namespace, pod.Name, networkNamespace)
teardownErr := r.network.TearDownPod(pod.Namespace, pod.Name, containerID)
if teardownErr != nil { if teardownErr != nil {
glog.Error(teardownErr) glog.Error(teardownErr)
} }
if _, err := r.execer.Command("ip", "netns", "del", makePodNetnsName(pod.UID)).Output(); err != nil { if _, err := r.execer.Command("ip", "netns", "del", networkNamespace.ID).Output(); err != nil {
return fmt.Errorf("rkt: Failed to remove network namespace for pod %s: %v", format.Pod(pod), err) return fmt.Errorf("rkt: Failed to remove network namespace for pod %s: %v", format.Pod(pod), err)
} }
@ -1138,7 +1155,7 @@ func constructSyslogIdentifier(generateName string, podName string) string {
// //
// On success, it will return a string that represents name of the unit file // On success, it will return a string that represents name of the unit file
// and the runtime pod. // and the runtime pod.
func (r *Runtime) preparePod(pod *v1.Pod, podIP string, pullSecrets []v1.Secret, netnsName string) (string, *kubecontainer.Pod, error) { func (r *Runtime) preparePod(pod *v1.Pod, podIP string, pullSecrets []v1.Secret, networkNamespaceID string) (string, *kubecontainer.Pod, error) {
// Generate the appc pod manifest from the k8s pod spec. // Generate the appc pod manifest from the k8s pod spec.
manifest, err := r.makePodManifest(pod, podIP, pullSecrets) manifest, err := r.makePodManifest(pod, podIP, pullSecrets)
if err != nil { if err != nil {
@ -1179,7 +1196,7 @@ func (r *Runtime) preparePod(pod *v1.Pod, podIP string, pullSecrets []v1.Secret,
glog.V(4).Infof("'rkt prepare' returns %q", uuid) glog.V(4).Infof("'rkt prepare' returns %q", uuid)
// Create systemd service file for the rkt pod. // Create systemd service file for the rkt pod.
runPrepared, err := r.generateRunCommand(pod, uuid, netnsName) runPrepared, err := r.generateRunCommand(pod, uuid, networkNamespaceID)
if err != nil { if err != nil {
return "", nil, fmt.Errorf("failed to generate 'rkt run-prepared' command: %v", err) return "", nil, fmt.Errorf("failed to generate 'rkt run-prepared' command: %v", err)
} }
@ -1205,6 +1222,7 @@ func (r *Runtime) preparePod(pod *v1.Pod, podIP string, pullSecrets []v1.Secret,
newUnitOption(unitKubernetesSection, unitPodName, pod.Name), newUnitOption(unitKubernetesSection, unitPodName, pod.Name),
newUnitOption(unitKubernetesSection, unitPodNamespace, pod.Namespace), newUnitOption(unitKubernetesSection, unitPodNamespace, pod.Namespace),
newUnitOption(unitKubernetesSection, unitPodHostNetwork, fmt.Sprintf("%v", hostNetwork)), newUnitOption(unitKubernetesSection, unitPodHostNetwork, fmt.Sprintf("%v", hostNetwork)),
newUnitOption(unitKubernetesSection, unitPodNetworkNamespace, networkNamespaceID),
} }
if pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.SELinuxOptions != nil { if pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.SELinuxOptions != nil {
@ -1267,8 +1285,10 @@ func (r *Runtime) generateEvents(runtimePod *kubecontainer.Pod, reason string, f
return return
} }
func makePodNetnsName(podID kubetypes.UID) string { // Generate a Network Namespace based on a New UUID
return fmt.Sprintf("%s%s", kubernetesUnitPrefix, string(podID)) // to run the Pod and all of its containers inside a dedicated unique namespace
func generateNetworkNamespaceUUID() kubecontainer.ContainerID {
return kubecontainer.ContainerID{ID: fmt.Sprintf("%s%s", kubernetesUnitPrefix, uuid.NewUUID())}
} }
func netnsPathFromName(netnsName string) string { func netnsPathFromName(netnsName string) string {
@ -1281,41 +1301,42 @@ func netnsPathFromName(netnsName string) string {
// one occurred. // one occurred.
// //
// If the pod is running in host network or is running using the no-op plugin, then nothing will be done. // If the pod is running in host network or is running using the no-op plugin, then nothing will be done.
func (r *Runtime) setupPodNetwork(pod *v1.Pod) (string, string, error) { func (r *Runtime) setupPodNetwork(pod *v1.Pod) (kubecontainer.ContainerID, string, error) {
glog.V(3).Infof("Calling network plugin %s to set up pod for %s", r.network.PluginName(), format.Pod(pod)) glog.V(3).Infof("Calling network plugin %s to set up pod for %s", r.network.PluginName(), format.Pod(pod))
var networkNamespace kubecontainer.ContainerID
// No-op if the pod is not running in a created netns. // No-op if the pod is not running in a created netns.
if !r.shouldCreateNetns(pod) { if !r.shouldCreateNetns(pod) {
return "", "", nil return networkNamespace, "", nil
} }
netnsName := makePodNetnsName(pod.UID) networkNamespace = generateNetworkNamespaceUUID()
glog.V(5).Infof("New network namespace %q generated for pod %s", networkNamespace.ID, format.Pod(pod))
// Create a new network namespace for the pod // Create the network namespace for the pod
r.execer.Command("ip", "netns", "del", netnsName).Output() _, err := r.execer.Command("ip", "netns", "add", networkNamespace.ID).Output()
_, err := r.execer.Command("ip", "netns", "add", netnsName).Output()
if err != nil { if err != nil {
return "", "", fmt.Errorf("failed to create pod network namespace: %v", err) return networkNamespace, "", fmt.Errorf("failed to create pod network namespace: %v", err)
} }
// Set up networking with the network plugin // Set up networking with the network plugin
containerID := kubecontainer.ContainerID{ID: string(pod.UID)} err = r.network.SetUpPod(pod.Namespace, pod.Name, networkNamespace, pod.Annotations)
err = r.network.SetUpPod(pod.Namespace, pod.Name, containerID, pod.Annotations)
if err != nil { if err != nil {
return "", "", err return networkNamespace, "", err
} }
status, err := r.network.GetPodNetworkStatus(pod.Namespace, pod.Name, containerID) status, err := r.network.GetPodNetworkStatus(pod.Namespace, pod.Name, networkNamespace)
if err != nil { if err != nil {
return "", "", err return networkNamespace, "", err
} }
if r.configureHairpinMode { if r.configureHairpinMode {
if err = hairpin.SetUpContainerPath(netnsPathFromName(netnsName), network.DefaultInterfaceName); err != nil { if err = hairpin.SetUpContainerPath(netnsPathFromName(networkNamespace.ID), network.DefaultInterfaceName); err != nil {
glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err) glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err)
} }
} }
return netnsName, status.IP.String(), nil return networkNamespace, status.IP.String(), nil
} }
// For a hostPath volume: rkt doesn't create any missing volume on the node/host so we need to create it // For a hostPath volume: rkt doesn't create any missing volume on the node/host so we need to create it
@ -1341,7 +1362,7 @@ func (r *Runtime) RunPod(pod *v1.Pod, pullSecrets []v1.Secret) error {
glog.V(4).Infof("Rkt starts to run pod: name %q.", format.Pod(pod)) glog.V(4).Infof("Rkt starts to run pod: name %q.", format.Pod(pod))
var err error var err error
var netnsName string var networkNamespace kubecontainer.ContainerID
var podIP string var podIP string
err = createHostPathVolumes(pod) err = createHostPathVolumes(pod)
@ -1349,13 +1370,13 @@ func (r *Runtime) RunPod(pod *v1.Pod, pullSecrets []v1.Secret) error {
return err return err
} }
netnsName, podIP, err = r.setupPodNetwork(pod) networkNamespace, podIP, err = r.setupPodNetwork(pod)
if err != nil { if err != nil {
r.cleanupPodNetwork(pod) r.cleanupPodNetwork(pod, networkNamespace)
return err return err
} }
name, runtimePod, prepareErr := r.preparePod(pod, podIP, pullSecrets, netnsName) name, runtimePod, prepareErr := r.preparePod(pod, podIP, pullSecrets, networkNamespace.ID)
// Set container references and generate events. // Set container references and generate events.
// If preparedPod fails, then send out 'failed' events for each container. // If preparedPod fails, then send out 'failed' events for each container.
@ -1375,7 +1396,7 @@ func (r *Runtime) RunPod(pod *v1.Pod, pullSecrets []v1.Secret) error {
} }
if prepareErr != nil { if prepareErr != nil {
r.cleanupPodNetwork(pod) r.cleanupPodNetwork(pod, networkNamespace)
return prepareErr return prepareErr
} }
@ -1387,7 +1408,7 @@ func (r *Runtime) RunPod(pod *v1.Pod, pullSecrets []v1.Secret) error {
_, err = r.systemd.RestartUnit(name, "replace", reschan) _, err = r.systemd.RestartUnit(name, "replace", reschan)
if err != nil { if err != nil {
r.generateEvents(runtimePod, "Failed", err) r.generateEvents(runtimePod, "Failed", err)
r.cleanupPodNetwork(pod) r.cleanupPodNetwork(pod, networkNamespace)
return err return err
} }
@ -1395,7 +1416,7 @@ func (r *Runtime) RunPod(pod *v1.Pod, pullSecrets []v1.Secret) error {
if res != "done" { if res != "done" {
err := fmt.Errorf("Failed to restart unit %q: %s", name, res) err := fmt.Errorf("Failed to restart unit %q: %s", name, res)
r.generateEvents(runtimePod, "Failed", err) r.generateEvents(runtimePod, "Failed", err)
r.cleanupPodNetwork(pod) r.cleanupPodNetwork(pod, networkNamespace)
return err return err
} }
@ -1407,7 +1428,7 @@ func (r *Runtime) RunPod(pod *v1.Pod, pullSecrets []v1.Secret) error {
if errKill := r.KillPod(pod, *runtimePod, nil); errKill != nil { if errKill := r.KillPod(pod, *runtimePod, nil); errKill != nil {
return errors.NewAggregate([]error{err, errKill}) return errors.NewAggregate([]error{err, errKill})
} }
r.cleanupPodNetwork(pod) r.cleanupPodNetwork(pod, networkNamespace)
return err return err
} }
@ -1850,62 +1871,84 @@ func podIsActive(pod *rktapi.Pod) bool {
// GetNetNS returns the network namespace path for the given container // GetNetNS returns the network namespace path for the given container
func (r *Runtime) GetNetNS(containerID kubecontainer.ContainerID) (string, error) { func (r *Runtime) GetNetNS(containerID kubecontainer.ContainerID) (string, error) {
// This is a slight hack, kubenet shouldn't be asking us about a container id // Currently the containerID is a UUID for a network namespace
// but a pod id. This is because it knows too much about the infra container. // This hack is a way to create an unique network namespace for each new starting/restarting Pod
// We pretend the pod.UID is an infra container ID. // We can do this because we played the same trick in
// This deception is only possible because we played the same trick in
// `networkPlugin.SetUpPod` and `networkPlugin.TearDownPod`. // `networkPlugin.SetUpPod` and `networkPlugin.TearDownPod`.
return netnsPathFromName(makePodNetnsName(kubetypes.UID(containerID.ID))), nil // See https://github.com/kubernetes/kubernetes/issues/45149
return netnsPathFromName(containerID.ID), nil
} }
func (r *Runtime) GetPodContainerID(pod *kubecontainer.Pod) (kubecontainer.ContainerID, error) { func (r *Runtime) GetPodContainerID(pod *kubecontainer.Pod) (kubecontainer.ContainerID, error) {
return kubecontainer.ContainerID{ID: string(pod.ID)}, nil return kubecontainer.ContainerID{ID: string(pod.ID)}, nil
} }
func podDetailsFromServiceFile(serviceFilePath string) (string, string, string, bool, error) { func podDetailsFromServiceFile(serviceFilePath string) (podService podServiceDirective, err error) {
f, err := os.Open(serviceFilePath) f, err := os.Open(serviceFilePath)
if err != nil { if err != nil {
return "", "", "", false, err return podService, err
} }
defer f.Close() defer f.Close()
opts, err := unit.Deserialize(f) opts, err := unit.Deserialize(f)
if err != nil { if err != nil {
return "", "", "", false, err return podService, err
} }
var id, name, namespace, hostnetwork string var hostnetwork, networkNamespace string
for _, o := range opts { for _, o := range opts {
if o.Section != unitKubernetesSection { if o.Section != unitKubernetesSection {
continue continue
} }
switch o.Name { switch o.Name {
case unitPodUID: case unitPodUID:
id = o.Value podService.id = o.Value
case unitPodName: case unitPodName:
name = o.Value podService.name = o.Value
case unitPodNamespace: case unitPodNamespace:
namespace = o.Value podService.namespace = o.Value
case unitPodHostNetwork: case unitPodHostNetwork:
hostnetwork = o.Value hostnetwork = o.Value
case unitPodNetworkNamespace:
networkNamespace = o.Value
} }
if id != "" && name != "" && namespace != "" && hostnetwork != "" { if podService.id != "" && podService.name != "" && podService.namespace != "" && hostnetwork != "" && networkNamespace != "" {
podHostNetwork, err := strconv.ParseBool(hostnetwork) podService.hostNetwork, err = strconv.ParseBool(hostnetwork)
podService.networkNamespace = kubecontainer.ContainerID{ID: networkNamespace}
if err != nil { if err != nil {
return "", "", "", false, err return podService, err
} }
return id, name, namespace, podHostNetwork, nil return podService, nil
} }
} }
return "", "", "", false, fmt.Errorf("failed to parse pod from file %s", serviceFilePath) return podService, fmt.Errorf("failed to parse pod from file %s", serviceFilePath)
} }
func (r *Runtime) DeleteContainer(containerID kubecontainer.ContainerID) error { func (r *Runtime) DeleteContainer(containerID kubecontainer.ContainerID) error {
return fmt.Errorf("unimplemented") return fmt.Errorf("unimplemented")
} }
// Collects all the systemd units for k8s Pods
func (r *Runtime) getPodSystemdServiceFiles() ([]os.FileInfo, error) {
// Get all the current units
files, err := r.os.ReadDir(systemdServiceDir)
if err != nil {
glog.Errorf("rkt: Failed to read the systemd service directory: %v", err)
return files, err
}
// Keep only k8s unit files
k8sSystemdServiceFiles := files[:0]
for _, f := range files {
if strings.HasPrefix(f.Name(), kubernetesUnitPrefix) {
k8sSystemdServiceFiles = append(k8sSystemdServiceFiles, f)
}
}
return k8sSystemdServiceFiles, err
}
// GarbageCollect collects the pods/containers. // GarbageCollect collects the pods/containers.
// After one GC iteration: // After one GC iteration:
// - The deleted pods will be removed. // - The deleted pods will be removed.
@ -1922,9 +1965,8 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo
glog.V(4).Infof("rkt: Garbage collecting triggered with policy %v", gcPolicy) glog.V(4).Infof("rkt: Garbage collecting triggered with policy %v", gcPolicy)
// GC all inactive systemd service files and pods. // GC all inactive systemd service files and pods.
files, err := r.os.ReadDir(systemdServiceDir) files, err := r.getPodSystemdServiceFiles()
if err != nil { if err != nil {
glog.Errorf("rkt: Failed to read the systemd service directory: %v", err)
return err return err
} }
@ -1960,7 +2002,6 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo
// Remove any orphan service files. // Remove any orphan service files.
for _, f := range files { for _, f := range files {
serviceName := f.Name() serviceName := f.Name()
if strings.HasPrefix(serviceName, kubernetesUnitPrefix) {
rktUUID := getRktUUIDFromServiceFileName(serviceName) rktUUID := getRktUUIDFromServiceFileName(serviceName)
if _, ok := allPods[rktUUID]; !ok { if _, ok := allPods[rktUUID]; !ok {
glog.V(4).Infof("rkt: No rkt pod found for service file %q, will remove it", serviceName) glog.V(4).Infof("rkt: No rkt pod found for service file %q, will remove it", serviceName)
@ -1979,7 +2020,6 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo
} }
} }
} }
}
sort.Sort(podsByCreatedAt(inactivePods)) sort.Sort(podsByCreatedAt(inactivePods))
@ -1997,7 +2037,7 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo
} }
} }
// Remove pods and their servie files. // Remove pods and their service files.
for _, pod := range removeCandidates { for _, pod := range removeCandidates {
if err := r.removePod(pod.Id); err != nil { if err := r.removePod(pod.Id); err != nil {
errlist = append(errlist, fmt.Errorf("rkt: Failed to clean up rkt pod %q: %v", pod.Id, err)) errlist = append(errlist, fmt.Errorf("rkt: Failed to clean up rkt pod %q: %v", pod.Id, err))
@ -2007,23 +2047,23 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo
return errors.NewAggregate(errlist) return errors.NewAggregate(errlist)
} }
// Read kubernetes pod UUID, namespace, and name from systemd service file and // Read kubernetes pod UUID, namespace, netns and name from systemd service file and
// use that to clean up any pod network that may still exist. // use that to clean up any pod network that may still exist.
func (r *Runtime) cleanupPodNetworkFromServiceFile(serviceFilePath string) error { func (r *Runtime) cleanupPodNetworkFromServiceFile(serviceFilePath string) error {
id, name, namespace, hostnetwork, err := podDetailsFromServiceFile(serviceFilePath) podService, err := podDetailsFromServiceFile(serviceFilePath)
if err != nil { if err != nil {
return err return err
} }
return r.cleanupPodNetwork(&v1.Pod{ return r.cleanupPodNetwork(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
UID: kubetypes.UID(id), UID: kubetypes.UID(podService.id),
Name: name, Name: podService.name,
Namespace: namespace, Namespace: podService.namespace,
}, },
Spec: v1.PodSpec{ Spec: v1.PodSpec{
HostNetwork: hostnetwork, HostNetwork: podService.hostNetwork,
}, },
}) }, podService.networkNamespace)
} }
// removePod calls 'rkt rm $UUID' to delete a rkt pod, it also remove the systemd service file // removePod calls 'rkt rm $UUID' to delete a rkt pod, it also remove the systemd service file
@ -2314,6 +2354,28 @@ func populateContainerStatus(pod rktapi.Pod, app rktapi.App, runtimeApp appcsche
}, nil }, nil
} }
// from a running systemd unit, return the network namespace of a Pod
// this field is inside the X-Kubernetes directive
func (r *Runtime) fromRunningUnitFiles(uid kubetypes.UID, latestPod *rktapi.Pod) (networkNamespace kubecontainer.ContainerID, err error) {
serviceFiles, err := r.getPodSystemdServiceFiles()
if err != nil {
return networkNamespace, err
}
for _, f := range serviceFiles {
fileName := f.Name()
if latestPod.Id == getRktUUIDFromServiceFileName(fileName) {
podService, err := podDetailsFromServiceFile(serviceFilePath(fileName))
if err != nil {
return networkNamespace, err
}
return podService.networkNamespace, nil
}
}
return networkNamespace, fmt.Errorf("Pod %q containing rktPod %q haven't find a corresponding NetworkNamespace in %d systemd units", uid, latestPod.Id, len(serviceFiles))
}
// GetPodStatus returns the status for a pod specified by a given UID, name, // GetPodStatus returns the status for a pod specified by a given UID, name,
// and namespace. It will attempt to find pod's information via a request to // and namespace. It will attempt to find pod's information via a request to
// the rkt api server. // the rkt api server.
@ -2367,27 +2429,34 @@ func (r *Runtime) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kube
} }
} }
if latestPod == nil {
glog.Warningf("No latestPod: rkt api-svc returns [%d]rktPods, cannot fill podStatus.IP", len(listResp.Pods))
return podStatus, nil
}
// If we are running no-op network plugin, then get the pod IP from the rkt pod status. // If we are running no-op network plugin, then get the pod IP from the rkt pod status.
if r.network.PluginName() == network.DefaultPluginName { if r.network.PluginName() == network.DefaultPluginName {
if latestPod != nil {
for _, n := range latestPod.Networks { for _, n := range latestPod.Networks {
if n.Name == defaultNetworkName { if n.Name == defaultNetworkName {
podStatus.IP = n.Ipv4 podStatus.IP = n.Ipv4
break break
} }
} }
return podStatus, nil
} }
} else {
containerID := kubecontainer.ContainerID{ID: string(uid)} networkNamespace, err := r.netns.fromRunningUnitFiles(uid, latestPod)
status, err := r.network.GetPodNetworkStatus(namespace, name, containerID) if err != nil {
glog.Warningf("networkNamespace: %v", err)
}
status, err := r.network.GetPodNetworkStatus(namespace, name, networkNamespace)
if err != nil { if err != nil {
glog.Warningf("rkt: %v", err) glog.Warningf("rkt: %v", err)
} else if status != nil { } else if status != nil {
// status can be nil when the pod is running on the host network, in which case the pod IP // status can be nil when the pod is running on the host network,
// will be populated by the upper layer. // in which case the pod IP will be populated by the upper layer.
podStatus.IP = status.IP.String() podStatus.IP = status.IP.String()
} }
}
return podStatus, nil return podStatus, nil
} }

View File

@ -583,6 +583,7 @@ func TestGetPodStatus(t *testing.T) {
defer ctrl.Finish() defer ctrl.Finish()
fr := newFakeRktInterface() fr := newFakeRktInterface()
fs := newFakeSystemd() fs := newFakeSystemd()
fnet := newFakeNetNs()
fnp := nettest.NewMockNetworkPlugin(ctrl) fnp := nettest.NewMockNetworkPlugin(ctrl)
fos := &containertesting.FakeOS{} fos := &containertesting.FakeOS{}
frh := &containertesting.FakeRuntimeHelper{} frh := &containertesting.FakeRuntimeHelper{}
@ -592,6 +593,7 @@ func TestGetPodStatus(t *testing.T) {
runtimeHelper: frh, runtimeHelper: frh,
os: fos, os: fos,
network: network.NewPluginManager(fnp), network: network.NewPluginManager(fnp),
netns: fnet,
} }
ns := func(seconds int64) int64 { ns := func(seconds int64) int64 {
@ -808,6 +810,8 @@ func TestGetPodStatus(t *testing.T) {
podTimes[podFinishedMarkerPath(r.runtimeHelper.GetPodDir(tt.result.ID), pod.Id)] = tt.result.ContainerStatuses[0].FinishedAt podTimes[podFinishedMarkerPath(r.runtimeHelper.GetPodDir(tt.result.ID), pod.Id)] = tt.result.ContainerStatuses[0].FinishedAt
} }
ctrl := gomock.NewController(t)
r.os.(*containertesting.FakeOS).StatFn = func(name string) (os.FileInfo, error) { r.os.(*containertesting.FakeOS).StatFn = func(name string) (os.FileInfo, error) {
podTime, ok := podTimes[name] podTime, ok := podTimes[name]
if !ok { if !ok {
@ -817,9 +821,13 @@ func TestGetPodStatus(t *testing.T) {
mockFI.EXPECT().ModTime().Return(podTime) mockFI.EXPECT().ModTime().Return(podTime)
return mockFI, nil return mockFI, nil
} }
fnp.EXPECT().Name().Return(tt.networkPluginName)
if tt.networkPluginName == kubenet.KubenetPluginName { if tt.networkPluginName == network.DefaultPluginName {
fnp.EXPECT().Name().Return(tt.networkPluginName)
}
if tt.pods != nil && tt.networkPluginName == kubenet.KubenetPluginName {
fnp.EXPECT().Name().Return(tt.networkPluginName)
if tt.result.IP != "" { if tt.result.IP != "" {
fnp.EXPECT().GetPodNetworkStatus("default", "guestbook", kubecontainer.ContainerID{ID: "42"}). fnp.EXPECT().GetPodNetworkStatus("default", "guestbook", kubecontainer.ContainerID{ID: "42"}).
Return(&network.PodNetworkStatus{IP: net.ParseIP(tt.result.IP)}, nil) Return(&network.PodNetworkStatus{IP: net.ParseIP(tt.result.IP)}, nil)
@ -838,7 +846,9 @@ func TestGetPodStatus(t *testing.T) {
assert.Equal(t, tt.result, status, testCaseHint) assert.Equal(t, tt.result, status, testCaseHint)
assert.Equal(t, []string{"ListPods"}, fr.called, testCaseHint) assert.Equal(t, []string{"ListPods"}, fr.called, testCaseHint)
fnet.networkNamespace = kubecontainer.ContainerID{}
fr.CleanCalls() fr.CleanCalls()
ctrl.Finish()
} }
} }
@ -1800,6 +1810,10 @@ func TestGarbageCollect(t *testing.T) {
for _, name := range serviceFileNames { for _, name := range serviceFileNames {
mockFI := containertesting.NewMockFileInfo(ctrl) mockFI := containertesting.NewMockFileInfo(ctrl)
// we need to specify two calls
// first: get all systemd units
// second: filter only the files with a k8s_ prefix
mockFI.EXPECT().Name().Return(name)
mockFI.EXPECT().Name().Return(name) mockFI.EXPECT().Name().Return(name)
fileInfos = append(fileInfos, mockFI) fileInfos = append(fileInfos, mockFI)
} }
@ -2002,3 +2016,53 @@ func TestConstructSyslogIdentifier(t *testing.T) {
assert.Equal(t, testCase.identifier, identifier, fmt.Sprintf("Test case #%d", i)) assert.Equal(t, testCase.identifier, identifier, fmt.Sprintf("Test case #%d", i))
} }
} }
func TestGetPodSystemdServiceFiles(t *testing.T) {
fs := kubetesting.NewFakeOS()
r := &Runtime{os: fs}
testCases := []struct {
serviceFilesOnDisk []string
expected []string
}{
{
[]string{"one.service", "two.service", "k8s_513ce947-8f6e-4d27-8c03-99f97b78d680.service", "k8s_184482df-8630-4d41-b84f-302684871758.service", "k8s_f4a244d8-5ec2-4f59-b7dd-c9e130d6e7a3.service", "k8s_f5aad446-5598-488f-93a4-5a27e03e7fcb.service"},
[]string{"k8s_513ce947-8f6e-4d27-8c03-99f97b78d680.service", "k8s_184482df-8630-4d41-b84f-302684871758.service", "k8s_f4a244d8-5ec2-4f59-b7dd-c9e130d6e7a3.service", "k8s_f5aad446-5598-488f-93a4-5a27e03e7fcb.service"},
},
{
[]string{"one.service", "two.service"},
[]string{},
},
{
[]string{"one.service", "k8s_513ce947-8f6e-4d27-8c03-99f97b78d680.service"},
[]string{"k8s_513ce947-8f6e-4d27-8c03-99f97b78d680.service"},
},
}
for i, tt := range testCases {
ctrl := gomock.NewController(t)
fs.ReadDirFn = func(dirname string) ([]os.FileInfo, error) {
serviceFileNames := tt.serviceFilesOnDisk
var fileInfos []os.FileInfo
for _, name := range serviceFileNames {
mockFI := containertesting.NewMockFileInfo(ctrl)
// we need to specify two calls
// first: get all systemd units
// second: filter only the files with a k8s_ prefix
mockFI.EXPECT().Name().Return(name)
mockFI.EXPECT().Name().Return(name)
fileInfos = append(fileInfos, mockFI)
}
return fileInfos, nil
}
serviceFiles, err := r.getPodSystemdServiceFiles()
if err != nil {
t.Errorf("%v", err)
}
for _, f := range serviceFiles {
assert.Contains(t, tt.expected, f.Name(), fmt.Sprintf("Test case #%d", i))
}
}
}