diff --git a/pkg/kubelet/rkt/fake_rkt_interface_test.go b/pkg/kubelet/rkt/fake_rkt_interface_test.go index de3f700c4b0..2684fad214a 100644 --- a/pkg/kubelet/rkt/fake_rkt_interface_test.go +++ b/pkg/kubelet/rkt/fake_rkt_interface_test.go @@ -192,16 +192,28 @@ func (f fakePodGetter) GetPodByUID(uid types.UID) (*v1.Pod, bool) { return p, found } -type fakeNetNs struct { +type fakeUnitGetter struct { networkNamespace kubecontainer.ContainerID + callServices []string } -func newFakeNetNs() *fakeNetNs { - return &fakeNetNs{ +func newfakeUnitGetter() *fakeUnitGetter { + return &fakeUnitGetter{ networkNamespace: kubecontainer.ContainerID{}, } } -func (f *fakeNetNs) fromRunningUnitFiles(uid kubetypes.UID, latestPod *rktapi.Pod) (kubecontainer.ContainerID, error) { +func (f *fakeUnitGetter) getNetworkNamespace(uid kubetypes.UID, latestPod *rktapi.Pod) (kubecontainer.ContainerID, error) { return kubecontainer.ContainerID{ID: "42"}, nil } + +func (f *fakeUnitGetter) getKubernetesDirective(serviceFilePath string) (podServiceDirective, error) { + podService := podServiceDirective{ + id: "fake", + name: "fake", + namespace: "fake", + hostNetwork: true, + networkNamespace: kubecontainer.ContainerID{ID: "42"}, + } + return podService, nil +} diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 2fe8b80c56d..0ce8ee24cc6 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -184,7 +184,7 @@ type Runtime struct { // requestTimeout is the timeout of rkt requests. requestTimeout time.Duration - netns netNsGetter + unitGetter unitServiceGetter } // Field of the X-Kubernetes directive of a systemd service file @@ -211,9 +211,10 @@ type cliInterface interface { RunCommand(config *Config, args ...string) (result []string, err error) } -// netNsGetter wrapps the systemd open files for testing purpose -type netNsGetter interface { - fromRunningUnitFiles(kubetypes.UID, *rktapi.Pod) (kubecontainer.ContainerID, error) +// unitServiceGetter wrapps the systemd open files for testing purpose +type unitServiceGetter interface { + getKubernetesDirective(string) (podServiceDirective, error) + getNetworkNamespace(kubetypes.UID, *rktapi.Pod) (kubecontainer.ContainerID, error) } // New creates the rkt container runtime which implements the container runtime interface. @@ -308,7 +309,7 @@ func New( } rkt.cli = rkt - rkt.netns = rkt + rkt.unitGetter = rkt return rkt, nil } @@ -1068,13 +1069,12 @@ func (r *Runtime) generateRunCommand(pod *v1.Pod, uuid, networkNamespaceID strin } func (r *Runtime) cleanupPodNetwork(pod *v1.Pod, networkNamespace kubecontainer.ContainerID) error { - glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.network.PluginName(), format.Pod(pod)) - // No-op if the pod is not running in a created netns. if !r.shouldCreateNetns(pod) { return nil } + glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.network.PluginName(), format.Pod(pod)) teardownErr := r.network.TearDownPod(pod.Namespace, pod.Name, networkNamespace) if teardownErr != nil { glog.Error(teardownErr) @@ -1883,7 +1883,7 @@ func (r *Runtime) GetPodContainerID(pod *kubecontainer.Pod) (kubecontainer.Conta return kubecontainer.ContainerID{ID: string(pod.ID)}, nil } -func podDetailsFromServiceFile(serviceFilePath string) (podService podServiceDirective, err error) { +func (r *Runtime) getKubernetesDirective(serviceFilePath string) (podService podServiceDirective, err error) { f, err := os.Open(serviceFilePath) if err != nil { return podService, err @@ -2006,17 +2006,8 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo if _, ok := allPods[rktUUID]; !ok { glog.V(4).Infof("rkt: No rkt pod found for service file %q, will remove it", serviceName) - if err := r.systemd.ResetFailedUnit(serviceName); err != nil { - glog.Warningf("rkt: Failed to reset the failed systemd service %q: %v", serviceName, err) - } - serviceFile := serviceFilePath(serviceName) - - // Network may not be around anymore so errors are ignored - if err := r.cleanupPodNetworkFromServiceFile(serviceFile); err != nil { - glog.Warningf("rkt: Failed to clean up pod network from service %q: %v, the network may not be around already", serviceName, err) - } - if err := r.os.Remove(serviceFile); err != nil { - errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q: %v", serviceFile, err)) + if err := r.cleanupByPodId(rktUUID); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to clean up rkt pod %q: %v", rktUUID, err)) } } } @@ -2039,7 +2030,7 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo // Remove pods and their service files. for _, pod := range removeCandidates { - if err := r.removePod(pod.Id); err != nil { + if err := r.removePod(pod); err != nil { errlist = append(errlist, fmt.Errorf("rkt: Failed to clean up rkt pod %q: %v", pod.Id, err)) } } @@ -2050,7 +2041,7 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo // Read kubernetes pod UUID, namespace, netns and name from systemd service file and // use that to clean up any pod network that may still exist. func (r *Runtime) cleanupPodNetworkFromServiceFile(serviceFilePath string) error { - podService, err := podDetailsFromServiceFile(serviceFilePath) + podService, err := r.unitGetter.getKubernetesDirective(serviceFilePath) if err != nil { return err } @@ -2066,30 +2057,71 @@ func (r *Runtime) cleanupPodNetworkFromServiceFile(serviceFilePath string) error }, podService.networkNamespace) } -// removePod calls 'rkt rm $UUID' to delete a rkt pod, it also remove the systemd service file -// related to the pod. -func (r *Runtime) removePod(uuid string) error { - var errlist []error - glog.V(4).Infof("rkt: GC is removing pod %q", uuid) +// Remove the touched file created by ExecStartPost in the systemd service file +func (r *Runtime) removeFinishedMarkerFile(serviceName string) error { + serviceFile := serviceFilePath(serviceName) + podDetail, err := r.unitGetter.getKubernetesDirective(serviceFile) + if err != nil { + return err + } + podDir := r.runtimeHelper.GetPodDir(kubetypes.UID(podDetail.id)) + finishedFile := podFinishedMarkerPath(podDir, getRktUUIDFromServiceFileName(serviceName)) + return r.os.Remove(finishedFile) +} - serviceName := makePodServiceFileName(uuid) +// Iter over each container in the pod to delete its termination log file +func (r *Runtime) removeTerminationFiles(pod *rktapi.Pod) (errlist []error) { + // container == app + for _, app := range pod.Apps { + for _, annotation := range app.Annotations { + if annotation.GetKey() == k8sRktTerminationMessagePathAnno { + if err := r.os.Remove(annotation.GetValue()); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to remove for pod %q container file %v", pod.Id, err)) + } + } + } + } + return errlist +} + +func (r *Runtime) cleanupByPodId(podID string) (errlist []error) { + serviceName := makePodServiceFileName(podID) serviceFile := serviceFilePath(serviceName) - // Network may not be around anymore so errors are ignored if err := r.cleanupPodNetworkFromServiceFile(serviceFile); err != nil { - glog.Warningf("rkt: Failed to clean up pod network from service %q: %v, the network may not be around already", serviceName, err) + errlist = append(errlist, fmt.Errorf("rkt: Failed to clean up pod network from service %q: %v, the network may not be around already", serviceName, err)) } - if _, err := r.cli.RunCommand(nil, "rm", uuid); err != nil { - errlist = append(errlist, fmt.Errorf("rkt: Failed to remove pod %q: %v", uuid, err)) - } - - // GC systemd service files as well. + // GC finished marker, termination-log file, systemd service files as well. if err := r.systemd.ResetFailedUnit(serviceName); err != nil { - glog.Warningf("rkt: Failed to reset the failed systemd service %q: %v", serviceName, err) + errlist = append(errlist, fmt.Errorf("rkt: Failed to reset the failed systemd service %q: %v", serviceName, err)) + } + if err := r.removeFinishedMarkerFile(serviceName); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to remove finished file %q for unit %q: %v", serviceName, podID, err)) } if err := r.os.Remove(serviceFile); err != nil { - errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q for pod %q: %v", serviceFile, uuid, err)) + errlist = append(errlist, fmt.Errorf("rkt: Failed to remove service file %q for pod %q: %v", serviceFile, podID, err)) + } + return errlist +} + +// removePod calls 'rkt rm $UUID' to delete a rkt pod, +// it also remove the systemd service file, +// the finished-* marker and the termination-log files +// related to the pod. +func (r *Runtime) removePod(pod *rktapi.Pod) error { + var errlist []error + glog.V(4).Infof("rkt: GC is removing pod %q", pod) + + if err := r.cleanupByPodId(pod.Id); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to remove pod %q: %v", pod.Id, err)) + } + if err := r.removeTerminationFiles(pod); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to clean up pod TerminationMessageFile %q: %v", pod.Id, err)) + } + + if _, err := r.cli.RunCommand(nil, "rm", pod.Id); err != nil { + errlist = append(errlist, fmt.Errorf("rkt: Failed to remove pod %q: %v", pod.Id, err)) } return errors.NewAggregate(errlist) @@ -2356,7 +2388,7 @@ func populateContainerStatus(pod rktapi.Pod, app rktapi.App, runtimeApp appcsche // from a running systemd unit, return the network namespace of a Pod // this field is inside the X-Kubernetes directive -func (r *Runtime) fromRunningUnitFiles(uid kubetypes.UID, latestPod *rktapi.Pod) (networkNamespace kubecontainer.ContainerID, err error) { +func (r *Runtime) getNetworkNamespace(uid kubetypes.UID, latestPod *rktapi.Pod) (networkNamespace kubecontainer.ContainerID, err error) { serviceFiles, err := r.getPodSystemdServiceFiles() if err != nil { return networkNamespace, err @@ -2365,7 +2397,7 @@ func (r *Runtime) fromRunningUnitFiles(uid kubetypes.UID, latestPod *rktapi.Pod) for _, f := range serviceFiles { fileName := f.Name() if latestPod.Id == getRktUUIDFromServiceFileName(fileName) { - podService, err := podDetailsFromServiceFile(serviceFilePath(fileName)) + podService, err := r.unitGetter.getKubernetesDirective(serviceFilePath(fileName)) if err != nil { return networkNamespace, err } @@ -2445,7 +2477,7 @@ func (r *Runtime) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kube return podStatus, nil } - networkNamespace, err := r.netns.fromRunningUnitFiles(uid, latestPod) + networkNamespace, err := r.unitGetter.getNetworkNamespace(uid, latestPod) if err != nil { glog.Warningf("networkNamespace: %v", err) } diff --git a/pkg/kubelet/rkt/rkt_test.go b/pkg/kubelet/rkt/rkt_test.go index 117b143bcc8..e3955e4cf20 100644 --- a/pkg/kubelet/rkt/rkt_test.go +++ b/pkg/kubelet/rkt/rkt_test.go @@ -46,6 +46,7 @@ import ( nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" "k8s.io/kubernetes/pkg/kubelet/types" utilexec "k8s.io/kubernetes/pkg/util/exec" + "strings" ) func mustMarshalPodManifest(man *appcschema.PodManifest) []byte { @@ -583,7 +584,7 @@ func TestGetPodStatus(t *testing.T) { defer ctrl.Finish() fr := newFakeRktInterface() fs := newFakeSystemd() - fnet := newFakeNetNs() + fug := newfakeUnitGetter() fnp := nettest.NewMockNetworkPlugin(ctrl) fos := &containertesting.FakeOS{} frh := &containertesting.FakeRuntimeHelper{} @@ -593,7 +594,7 @@ func TestGetPodStatus(t *testing.T) { runtimeHelper: frh, os: fos, network: network.NewPluginManager(fnp), - netns: fnet, + unitGetter: fug, } ns := func(seconds int64) int64 { @@ -846,7 +847,7 @@ func TestGetPodStatus(t *testing.T) { assert.Equal(t, tt.result, status, testCaseHint) assert.Equal(t, []string{"ListPods"}, fr.called, testCaseHint) - fnet.networkNamespace = kubecontainer.ContainerID{} + fug.networkNamespace = kubecontainer.ContainerID{} fr.CleanCalls() ctrl.Finish() } @@ -1633,6 +1634,8 @@ func TestGarbageCollect(t *testing.T) { cli := newFakeRktCli() fakeOS := kubetesting.NewFakeOS() getter := newFakePodGetter() + fug := newfakeUnitGetter() + frh := &containertesting.FakeRuntimeHelper{} rkt := &Runtime{ os: fakeOS, @@ -1641,6 +1644,8 @@ func TestGarbageCollect(t *testing.T) { podGetter: getter, systemd: fs, containerRefManager: kubecontainer.NewRefManager(), + unitGetter: fug, + runtimeHelper: frh, } fakeApp := &rktapi.App{Name: "app-foo"} @@ -1651,7 +1656,7 @@ func TestGarbageCollect(t *testing.T) { pods []*rktapi.Pod serviceFilesOnDisk []string expectedCommands []string - expectedServiceFiles []string + expectedDeletedFiles []string }{ // All running pods, should not be gc'd. // Dead, new pods should not be gc'd. @@ -1738,7 +1743,7 @@ func TestGarbageCollect(t *testing.T) { }, []string{"k8s_dead-old.service", "k8s_deleted-foo.service", "k8s_non-existing-bar.service"}, []string{"rkt rm dead-old", "rkt rm deleted-foo"}, - []string{"/run/systemd/system/k8s_dead-old.service", "/run/systemd/system/k8s_deleted-foo.service", "/run/systemd/system/k8s_non-existing-bar.service"}, + []string{"/poddir/fake/finished-dead-old", "/poddir/fake/finished-deleted-foo", "/poddir/fake/finished-non-existing-bar", "/run/systemd/system/k8s_dead-old.service", "/run/systemd/system/k8s_deleted-foo.service", "/run/systemd/system/k8s_non-existing-bar.service"}, }, // gcPolicy.MaxContainers should be enforced. // Oldest ones are removed first. @@ -1795,7 +1800,7 @@ func TestGarbageCollect(t *testing.T) { }, []string{"k8s_dead-0.service", "k8s_dead-1.service", "k8s_dead-2.service"}, []string{"rkt rm dead-0", "rkt rm dead-1"}, - []string{"/run/systemd/system/k8s_dead-0.service", "/run/systemd/system/k8s_dead-1.service"}, + []string{"/poddir/fake/finished-dead-0", "/poddir/fake/finished-dead-1", "/run/systemd/system/k8s_dead-0.service", "/run/systemd/system/k8s_dead-1.service"}, }, } @@ -1834,14 +1839,17 @@ func TestGarbageCollect(t *testing.T) { assert.Equal(t, tt.expectedCommands, cli.cmds, testCaseHint) - sort.Sort(sortedStringList(tt.expectedServiceFiles)) + sort.Sort(sortedStringList(tt.expectedDeletedFiles)) sort.Sort(sortedStringList(fakeOS.Removes)) sort.Sort(sortedStringList(fs.resetFailedUnits)) - assert.Equal(t, tt.expectedServiceFiles, fakeOS.Removes, testCaseHint) + assert.Equal(t, tt.expectedDeletedFiles, fakeOS.Removes, testCaseHint) var expectedService []string - for _, f := range tt.expectedServiceFiles { - expectedService = append(expectedService, filepath.Base(f)) + for _, f := range tt.expectedDeletedFiles { + unit := filepath.Base(f) + if strings.HasSuffix(unit, ".service") && strings.HasPrefix(unit, kubernetesUnitPrefix) { + expectedService = append(expectedService, unit) + } } assert.Equal(t, expectedService, fs.resetFailedUnits, testCaseHint)