kubelet: wire checkpoint container support through

This adds the last pieces to wire through the container checkpoint
support in the kubelet.

Signed-off-by: Adrian Reber <areber@redhat.com>
This commit is contained in:
Adrian Reber 2021-09-10 12:38:08 +00:00
parent 8c24857ba3
commit fc37a7a990
No known key found for this signature in database
GPG Key ID: 82C9378ED3C4906A
9 changed files with 369 additions and 4 deletions

View File

@ -190,6 +190,13 @@ const (
// Allows clients to request a duration for certificates issued via the Kubernetes CSR API.
CSRDuration featuregate.Feature = "CSRDuration"
// owner: @adrianreber
// kep: http://kep.k8s.io/2008
// alpha: v1.25
//
// Enables container Checkpoint support in the kubelet
ContainerCheckpoint featuregate.Feature = "ContainerCheckpoint"
// owner: @jiahuif
// alpha: v1.21
// beta: v1.22
@ -846,6 +853,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
CSRDuration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26
ContainerCheckpoint: {Default: false, PreRelease: featuregate.Alpha},
ControllerManagerLeaderMigration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26
CronJobTimeZone: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -322,3 +322,13 @@ func (f *RemoteRuntime) ReopenContainerLog(ctx context.Context, req *kubeapi.Reo
return &kubeapi.ReopenContainerLogResponse{}, nil
}
// CheckpointContainer checkpoints the given container.
func (f *RemoteRuntime) CheckpointContainer(ctx context.Context, req *kubeapi.CheckpointContainerRequest) (*kubeapi.CheckpointContainerResponse, error) {
err := f.RuntimeService.CheckpointContainer(&kubeapi.CheckpointContainerRequest{})
if err != nil {
return nil, err
}
return &kubeapi.CheckpointContainerResponse{}, nil
}

View File

@ -25,6 +25,7 @@ import (
"net/http"
"os"
"path"
"path/filepath"
sysruntime "runtime"
"sort"
"strings"
@ -60,6 +61,7 @@ import (
cloudprovider "k8s.io/cloud-provider"
"k8s.io/component-helpers/apimachinery/lease"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
@ -2445,6 +2447,44 @@ func (kl *Kubelet) fastStatusUpdateOnce() {
}
}
// CheckpointContainer tries to checkpoint a container. The parameters are used to
// look up the specified container. If the container specified by the given parameters
// cannot be found an error is returned. If the container is found the container
// engine will be asked to checkpoint the given container into the kubelet's default
// checkpoint directory.
func (kl *Kubelet) CheckpointContainer(
podUID types.UID,
podFullName,
containerName string,
options *runtimeapi.CheckpointContainerRequest,
) error {
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container %v not found", containerName)
}
options.Location = filepath.Join(
kl.getCheckpointsDir(),
fmt.Sprintf(
"checkpoint-%s-%s-%s.tar",
podFullName,
containerName,
time.Now().Format(time.RFC3339),
),
)
options.ContainerId = string(container.ID.ID)
if err := kl.containerRuntime.CheckpointContainer(options); err != nil {
return err
}
return nil
}
// isSyncPodWorthy filters out events that are not worthy of pod syncing
func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
// ContainerRemoved doesn't affect pod state

View File

@ -21,10 +21,12 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
goruntime "runtime"
"sort"
"strconv"
"strings"
"testing"
"time"
@ -44,6 +46,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2/ktesting"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
"k8s.io/kubernetes/pkg/kubelet/cm"
@ -1583,6 +1586,119 @@ func TestFilterOutInactivePods(t *testing.T) {
assert.Equal(t, expected, actual)
}
func TestCheckpointContainer(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
containerID := kubecontainer.ContainerID{
Type: "test",
ID: "abc1234",
}
fakePod := &containertest.FakePod{
Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "podFoo",
Namespace: "nsFoo",
Containers: []*kubecontainer.Container{
{
Name: "containerFoo",
ID: containerID,
},
},
},
}
fakeRuntime.PodList = []*containertest.FakePod{fakePod}
wrongContainerName := "wrongContainerName"
tests := []struct {
name string
containerName string
checkpointLocation string
expectedStatus error
expectedLocation string
}{
{
name: "Checkpoint with wrong container name",
containerName: wrongContainerName,
checkpointLocation: "",
expectedStatus: fmt.Errorf("container %s not found", wrongContainerName),
expectedLocation: "",
},
{
name: "Checkpoint with default checkpoint location",
containerName: fakePod.Pod.Containers[0].Name,
checkpointLocation: "",
expectedStatus: nil,
expectedLocation: filepath.Join(
kubelet.getCheckpointsDir(),
fmt.Sprintf(
"checkpoint-%s_%s-%s",
fakePod.Pod.Name,
fakePod.Pod.Namespace,
fakePod.Pod.Containers[0].Name,
),
),
},
{
name: "Checkpoint with ignored location",
containerName: fakePod.Pod.Containers[0].Name,
checkpointLocation: "somethingThatWillBeIgnored",
expectedStatus: nil,
expectedLocation: filepath.Join(
kubelet.getCheckpointsDir(),
fmt.Sprintf(
"checkpoint-%s_%s-%s",
fakePod.Pod.Name,
fakePod.Pod.Namespace,
fakePod.Pod.Containers[0].Name,
),
),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
options := &runtimeapi.CheckpointContainerRequest{}
if test.checkpointLocation != "" {
options.Location = test.checkpointLocation
}
status := kubelet.CheckpointContainer(
fakePod.Pod.ID,
fmt.Sprintf(
"%s_%s",
fakePod.Pod.Name,
fakePod.Pod.Namespace,
),
test.containerName,
options,
)
require.Equal(t, status, test.expectedStatus)
if status != nil {
return
}
require.True(
t,
strings.HasPrefix(
options.Location,
test.expectedLocation,
),
)
require.Equal(
t,
options.ContainerId,
containerID.ID,
)
})
}
}
func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()

View File

@ -110,6 +110,7 @@ func AuthzTestCases() []AuthzTestCase {
testPaths := map[string]string{
"/attach/{podNamespace}/{podID}/{containerName}": "proxy",
"/attach/{podNamespace}/{podID}/{uid}/{containerName}": "proxy",
"/checkpoint/{podNamespace}/{podID}/{containerName}": "proxy",
"/configz": "proxy",
"/containerLogs/{podNamespace}/{podID}/{containerName}": "proxy",
"/debug/flags/v": "proxy",

View File

@ -62,6 +62,7 @@ import (
"k8s.io/component-base/logs"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
podresourcesapiv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/api/legacyscheme"
@ -224,6 +225,7 @@ type HostInterface interface {
GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
GetRunningPods() ([]*v1.Pod, error)
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
CheckpointContainer(podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error
GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
ServeLogs(w http.ResponseWriter, req *http.Request)
ResyncInterval() time.Duration
@ -403,6 +405,17 @@ func (s *Server) InstallDefaultHandlers() {
s.restfulCont.Handle(proberMetricsPath,
compbasemetrics.HandlerFor(p, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
)
// Only enable checkpoint API if the feature is enabled
if utilfeature.DefaultFeatureGate.Enabled(features.ContainerCheckpoint) {
s.addMetricsBucketMatcher("checkpoint")
ws = &restful.WebService{}
ws.Path("/checkpoint").Produces(restful.MIME_JSON)
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.checkpoint).
Operation("checkpoint"))
s.restfulCont.Add(ws)
}
}
// InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
@ -878,6 +891,83 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp
proxyStream(response.ResponseWriter, request.Request, url)
}
// checkpoint handles the checkpoint API request. It checks if the requested
// podNamespace, pod and container actually exist and only then calls out
// to the runtime to actually checkpoint the container.
func (s *Server) checkpoint(request *restful.Request, response *restful.Response) {
pod, ok := s.host.GetPodByName(request.PathParameter("podNamespace"), request.PathParameter("podID"))
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
containerName := request.PathParameter("containerName")
found := false
for _, container := range pod.Spec.Containers {
if container.Name == containerName {
found = true
}
}
if !found {
for _, container := range pod.Spec.InitContainers {
if container.Name == containerName {
found = true
}
}
}
if !found && utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
for _, container := range pod.Spec.EphemeralContainers {
if container.Name == containerName {
found = true
}
}
}
if !found {
response.WriteError(
http.StatusNotFound,
fmt.Errorf("container %v does not exist", containerName),
)
return
}
options := &runtimeapi.CheckpointContainerRequest{}
// Query parameter to select an optional timeout. Without the timeout parameter
// the checkpoint command will use the default CRI timeout.
timeouts := request.Request.URL.Query()["timeout"]
if len(timeouts) > 0 {
// If the user specified one or multiple values for timeouts we
// are using the last available value.
timeout, err := strconv.ParseInt(timeouts[len(timeouts)-1], 10, 64)
if err != nil {
response.WriteError(
http.StatusNotFound,
fmt.Errorf("cannot parse value of timeout parameter"),
)
return
}
options.Timeout = timeout
}
if err := s.host.CheckpointContainer(pod.UID, kubecontainer.GetPodFullName(pod), containerName, options); err != nil {
response.WriteError(
http.StatusInternalServerError,
fmt.Errorf(
"checkpointing of %v/%v/%v failed (%v)",
request.PathParameter("podNamespace"),
request.PathParameter("podID"),
containerName,
err,
),
)
return
}
writeJSONResponse(
response,
[]byte(fmt.Sprintf("{\"items\":[\"%s\"]}", options.Location)),
)
}
// getURLRootPath trims a URL path.
// For paths in the format of "/metrics/xxx", "metrics/xxx" is returned;
// For all other paths, the first part of the path is returned.

View File

@ -53,7 +53,10 @@ import (
"k8s.io/utils/pointer"
// Do some initialization to decode the query parameters correctly.
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/features"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/cri/streaming"
@ -146,6 +149,13 @@ func (fk *fakeKubelet) RunInContainer(podFullName string, uid types.UID, contain
return fk.runFunc(podFullName, uid, containerName, cmd)
}
func (fk *fakeKubelet) CheckpointContainer(podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error {
if containerName == "checkpointingFailure" {
return fmt.Errorf("Returning error for test")
}
return nil
}
type fakeRuntime struct {
execFunc func(string, []string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
attachFunc func(string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
@ -348,7 +358,8 @@ func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletCo
fw.fakeKubelet,
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}),
fw.fakeAuth,
kubeCfg)
kubeCfg,
)
fw.serverUnderTest = &server
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
return fw
@ -459,7 +470,6 @@ func TestServeRunInContainerWithUID(t *testing.T) {
}
resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+testUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
if err != nil {
t.Fatalf("Got error POSTing: %v", err)
}
@ -546,6 +556,9 @@ func TestAuthzCoverage(t *testing.T) {
}
func TestAuthFilters(t *testing.T) {
// Enable features.ContainerCheckpoint during test
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ContainerCheckpoint, true)()
fw := newServerTest()
defer fw.testHTTPServer.Close()
@ -840,6 +853,78 @@ func TestContainerLogsWithInvalidTail(t *testing.T) {
}
}
func TestCheckpointContainer(t *testing.T) {
// Enable features.ContainerCheckpoint during test
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ContainerCheckpoint, true)()
fw := newServerTest()
defer fw.testHTTPServer.Close()
podNamespace := "other"
podName := "foo"
expectedContainerName := "baz"
// GetPodByName() should always fail
fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) {
return nil, false
}
t.Run("wrong pod namespace", func(t *testing.T) {
resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/"+expectedContainerName, "", nil)
if err != nil {
t.Errorf("Got error POSTing: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Errorf("Unexpected non-error checkpointing container: %#v", resp)
}
})
// let GetPodByName() return a result, but our container "wrongContainerName" is not part of the Pod
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
t.Run("wrong container name", func(t *testing.T) {
resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/wrongContainerName", "", nil)
if err != nil {
t.Errorf("Got error POSTing: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Errorf("Unexpected non-error checkpointing container: %#v", resp)
}
})
// Now the checkpointing of the container fails
fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: podNamespace,
Name: podName,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "checkpointingFailure",
},
},
},
}, true
}
t.Run("checkpointing fails", func(t *testing.T) {
resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/checkpointingFailure", "", nil)
if err != nil {
t.Errorf("Got error POSTing: %v", err)
}
defer resp.Body.Close()
assert.Equal(t, resp.StatusCode, 500)
body, _ := ioutil.ReadAll(resp.Body)
assert.Equal(t, string(body), "checkpointing of other/foo/checkpointingFailure failed (Returning error for test)")
})
// Now test a successful checkpoint succeeds
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
t.Run("checkpointing succeeds", func(t *testing.T) {
resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/"+expectedContainerName, "", nil)
if err != nil {
t.Errorf("Got error POSTing: %v", err)
}
assert.Equal(t, resp.StatusCode, 200)
})
}
func makeReq(t *testing.T, method, url, clientProtocol string) *http.Request {
req, err := http.NewRequest(method, url, nil)
if err != nil {

View File

@ -56,6 +56,8 @@ type ContainerManager interface {
// for the container. If it returns error, new container log file MUST NOT
// be created.
ReopenContainerLog(ContainerID string) error
// CheckpointContainer checkpoints a container
CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error
}
// PodSandboxManager contains methods for operating on PodSandboxes. The methods
@ -76,8 +78,6 @@ type PodSandboxManager interface {
ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error)
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
PortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
// CheckpointContainer checkpoints a container
CheckpointContainer(containerID, checkpointDir string) error
}
// ContainerStatsManager contains methods for retrieving the container

View File

@ -694,3 +694,17 @@ func (r *FakeRuntimeService) ReopenContainerLog(containerID string) error {
return nil
}
// CheckpointContainer emulates call to checkpoint a container in the FakeRuntimeService.
func (r *FakeRuntimeService) CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error {
r.Lock()
defer r.Unlock()
r.Called = append(r.Called, "CheckpointContainer")
if err := r.popError("CheckpointContainer"); err != nil {
return err
}
return nil
}