mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #104907 from adrianreber/2021-09-10-checkpoint
Minimal checkpointing support
This commit is contained in:
commit
a655368390
@ -190,6 +190,13 @@ const (
|
||||
// Allows clients to request a duration for certificates issued via the Kubernetes CSR API.
|
||||
CSRDuration featuregate.Feature = "CSRDuration"
|
||||
|
||||
// owner: @adrianreber
|
||||
// kep: http://kep.k8s.io/2008
|
||||
// alpha: v1.25
|
||||
//
|
||||
// Enables container Checkpoint support in the kubelet
|
||||
ContainerCheckpoint featuregate.Feature = "ContainerCheckpoint"
|
||||
|
||||
// owner: @jiahuif
|
||||
// alpha: v1.21
|
||||
// beta: v1.22
|
||||
@ -846,6 +853,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
|
||||
CSRDuration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26
|
||||
|
||||
ContainerCheckpoint: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
ControllerManagerLeaderMigration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26
|
||||
|
||||
CronJobTimeZone: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
@ -28,4 +28,5 @@ const (
|
||||
DefaultKubeletPluginContainersDirName = "plugin-containers"
|
||||
DefaultKubeletPodResourcesDirName = "pod-resources"
|
||||
KubeletPluginsDirSELinuxLabel = "system_u:object_r:container_file_t:s0"
|
||||
DefaultKubeletCheckpointsDirName = "checkpoints"
|
||||
)
|
||||
|
@ -119,6 +119,9 @@ type Runtime interface {
|
||||
// This method just proxies a new runtimeConfig with the updated
|
||||
// CIDR value down to the runtime shim.
|
||||
UpdatePodCIDR(podCIDR string) error
|
||||
// CheckpointContainer tells the runtime to checkpoint a container
|
||||
// and store the resulting archive to the checkpoint directory.
|
||||
CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error
|
||||
}
|
||||
|
||||
// StreamingRuntime is the interface implemented by runtimes that handle the serving of the
|
||||
|
@ -362,6 +362,14 @@ func (f *FakeRuntime) DeleteContainer(containerID kubecontainer.ContainerID) err
|
||||
return f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
f.CalledFunctions = append(f.CalledFunctions, "CheckpointContainer")
|
||||
return f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) ImageStats() (*kubecontainer.ImageStats, error) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
@ -126,6 +126,20 @@ func (mr *MockRuntimeMockRecorder) APIVersion() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "APIVersion", reflect.TypeOf((*MockRuntime)(nil).APIVersion))
|
||||
}
|
||||
|
||||
// CheckpointContainer mocks base method.
|
||||
func (m *MockRuntime) CheckpointContainer(options *v10.CheckpointContainerRequest) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "CheckpointContainer", options)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// CheckpointContainer indicates an expected call of CheckpointContainer.
|
||||
func (mr *MockRuntimeMockRecorder) CheckpointContainer(options interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckpointContainer", reflect.TypeOf((*MockRuntime)(nil).CheckpointContainer), options)
|
||||
}
|
||||
|
||||
// DeleteContainer mocks base method.
|
||||
func (m *MockRuntime) DeleteContainer(containerID container.ContainerID) error {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -322,3 +322,13 @@ func (f *RemoteRuntime) ReopenContainerLog(ctx context.Context, req *kubeapi.Reo
|
||||
|
||||
return &kubeapi.ReopenContainerLogResponse{}, nil
|
||||
}
|
||||
|
||||
// CheckpointContainer checkpoints the given container.
|
||||
func (f *RemoteRuntime) CheckpointContainer(ctx context.Context, req *kubeapi.CheckpointContainerRequest) (*kubeapi.CheckpointContainerResponse, error) {
|
||||
err := f.RuntimeService.CheckpointContainer(&kubeapi.CheckpointContainerRequest{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &kubeapi.CheckpointContainerResponse{}, nil
|
||||
}
|
||||
|
@ -1149,3 +1149,58 @@ func (r *remoteRuntimeService) ReopenContainerLog(containerID string) (err error
|
||||
klog.V(10).InfoS("[RemoteRuntimeService] ReopenContainerLog Response", "containerID", containerID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// CheckpointContainer triggers a checkpoint of the given CheckpointContainerRequest
|
||||
func (r *remoteRuntimeService) CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error {
|
||||
klog.V(10).InfoS(
|
||||
"[RemoteRuntimeService] CheckpointContainer",
|
||||
"options",
|
||||
options,
|
||||
)
|
||||
if options == nil {
|
||||
return errors.New("CheckpointContainer requires non-nil CheckpointRestoreOptions parameter")
|
||||
}
|
||||
if !r.useV1API() {
|
||||
return errors.New("CheckpointContainer is only supported in the CRI v1 runtime API")
|
||||
}
|
||||
|
||||
if options.Timeout < 0 {
|
||||
return errors.New("CheckpointContainer requires the timeout value to be > 0")
|
||||
}
|
||||
|
||||
ctx, cancel := func() (context.Context, context.CancelFunc) {
|
||||
defaultTimeout := int64(r.timeout / time.Second)
|
||||
if options.Timeout > defaultTimeout {
|
||||
// The user requested a specific timeout, let's use that if it
|
||||
// is larger than the CRI default.
|
||||
return getContextWithTimeout(time.Duration(options.Timeout) * time.Second)
|
||||
}
|
||||
// If the user requested a timeout less than the
|
||||
// CRI default, let's use the CRI default.
|
||||
options.Timeout = defaultTimeout
|
||||
return getContextWithTimeout(r.timeout)
|
||||
}()
|
||||
defer cancel()
|
||||
|
||||
_, err := r.runtimeClient.CheckpointContainer(
|
||||
ctx,
|
||||
options,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
klog.ErrorS(
|
||||
err,
|
||||
"CheckpointContainer from runtime service failed",
|
||||
"containerID",
|
||||
options.ContainerId,
|
||||
)
|
||||
return err
|
||||
}
|
||||
klog.V(10).InfoS(
|
||||
"[RemoteRuntimeService] CheckpointContainer Response",
|
||||
"containerID",
|
||||
options.ContainerId,
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
sysruntime "runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
@ -60,6 +61,7 @@ import (
|
||||
cloudprovider "k8s.io/cloud-provider"
|
||||
"k8s.io/component-helpers/apimachinery/lease"
|
||||
internalapi "k8s.io/cri-api/pkg/apis"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
"k8s.io/klog/v2"
|
||||
pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
|
||||
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
|
||||
@ -1229,6 +1231,7 @@ func (kl *Kubelet) RlimitStats() (*statsapi.RlimitStats, error) {
|
||||
// 2. the pods directory
|
||||
// 3. the plugins directory
|
||||
// 4. the pod-resources directory
|
||||
// 5. the checkpoint directory
|
||||
func (kl *Kubelet) setupDataDirs() error {
|
||||
kl.rootDirectory = path.Clean(kl.rootDirectory)
|
||||
pluginRegistrationDir := kl.getPluginsRegistrationDir()
|
||||
@ -1251,6 +1254,11 @@ func (kl *Kubelet) setupDataDirs() error {
|
||||
if err := os.MkdirAll(kl.getPodResourcesDir(), 0750); err != nil {
|
||||
return fmt.Errorf("error creating podresources directory: %v", err)
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ContainerCheckpoint) {
|
||||
if err := os.MkdirAll(kl.getCheckpointsDir(), 0700); err != nil {
|
||||
return fmt.Errorf("error creating checkpoint directory: %v", err)
|
||||
}
|
||||
}
|
||||
if selinux.GetEnabled() {
|
||||
err := selinux.SetFileLabel(pluginRegistrationDir, config.KubeletPluginsDirSELinuxLabel)
|
||||
if err != nil {
|
||||
@ -2439,6 +2447,44 @@ func (kl *Kubelet) fastStatusUpdateOnce() {
|
||||
}
|
||||
}
|
||||
|
||||
// CheckpointContainer tries to checkpoint a container. The parameters are used to
|
||||
// look up the specified container. If the container specified by the given parameters
|
||||
// cannot be found an error is returned. If the container is found the container
|
||||
// engine will be asked to checkpoint the given container into the kubelet's default
|
||||
// checkpoint directory.
|
||||
func (kl *Kubelet) CheckpointContainer(
|
||||
podUID types.UID,
|
||||
podFullName,
|
||||
containerName string,
|
||||
options *runtimeapi.CheckpointContainerRequest,
|
||||
) error {
|
||||
container, err := kl.findContainer(podFullName, podUID, containerName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if container == nil {
|
||||
return fmt.Errorf("container %v not found", containerName)
|
||||
}
|
||||
|
||||
options.Location = filepath.Join(
|
||||
kl.getCheckpointsDir(),
|
||||
fmt.Sprintf(
|
||||
"checkpoint-%s-%s-%s.tar",
|
||||
podFullName,
|
||||
containerName,
|
||||
time.Now().Format(time.RFC3339),
|
||||
),
|
||||
)
|
||||
|
||||
options.ContainerId = string(container.ID.ID)
|
||||
|
||||
if err := kl.containerRuntime.CheckpointContainer(options); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// isSyncPodWorthy filters out events that are not worthy of pod syncing
|
||||
func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
|
||||
// ContainerRemoved doesn't affect pod state
|
||||
|
@ -77,6 +77,12 @@ func (kl *Kubelet) getPluginDir(pluginName string) string {
|
||||
return filepath.Join(kl.getPluginsDir(), pluginName)
|
||||
}
|
||||
|
||||
// getCheckpointsDir returns a data directory name for checkpoints.
|
||||
// Checkpoints can be stored in this directory for further use.
|
||||
func (kl *Kubelet) getCheckpointsDir() string {
|
||||
return filepath.Join(kl.getRootDir(), config.DefaultKubeletCheckpointsDirName)
|
||||
}
|
||||
|
||||
// getVolumeDevicePluginsDir returns the full path to the directory under which plugin
|
||||
// directories are created. Plugins can use these directories for data that
|
||||
// they need to persist. Plugins should create subdirectories under this named
|
||||
|
@ -21,10 +21,12 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
goruntime "runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -44,6 +46,7 @@ import (
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
@ -1583,6 +1586,119 @@ func TestFilterOutInactivePods(t *testing.T) {
|
||||
assert.Equal(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestCheckpointContainer(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||
defer testKubelet.Cleanup()
|
||||
kubelet := testKubelet.kubelet
|
||||
|
||||
fakeRuntime := testKubelet.fakeRuntime
|
||||
containerID := kubecontainer.ContainerID{
|
||||
Type: "test",
|
||||
ID: "abc1234",
|
||||
}
|
||||
|
||||
fakePod := &containertest.FakePod{
|
||||
Pod: &kubecontainer.Pod{
|
||||
ID: "12345678",
|
||||
Name: "podFoo",
|
||||
Namespace: "nsFoo",
|
||||
Containers: []*kubecontainer.Container{
|
||||
{
|
||||
Name: "containerFoo",
|
||||
ID: containerID,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
fakeRuntime.PodList = []*containertest.FakePod{fakePod}
|
||||
wrongContainerName := "wrongContainerName"
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
containerName string
|
||||
checkpointLocation string
|
||||
expectedStatus error
|
||||
expectedLocation string
|
||||
}{
|
||||
{
|
||||
name: "Checkpoint with wrong container name",
|
||||
containerName: wrongContainerName,
|
||||
checkpointLocation: "",
|
||||
expectedStatus: fmt.Errorf("container %s not found", wrongContainerName),
|
||||
expectedLocation: "",
|
||||
},
|
||||
{
|
||||
name: "Checkpoint with default checkpoint location",
|
||||
containerName: fakePod.Pod.Containers[0].Name,
|
||||
checkpointLocation: "",
|
||||
expectedStatus: nil,
|
||||
expectedLocation: filepath.Join(
|
||||
kubelet.getCheckpointsDir(),
|
||||
fmt.Sprintf(
|
||||
"checkpoint-%s_%s-%s",
|
||||
fakePod.Pod.Name,
|
||||
fakePod.Pod.Namespace,
|
||||
fakePod.Pod.Containers[0].Name,
|
||||
),
|
||||
),
|
||||
},
|
||||
{
|
||||
name: "Checkpoint with ignored location",
|
||||
containerName: fakePod.Pod.Containers[0].Name,
|
||||
checkpointLocation: "somethingThatWillBeIgnored",
|
||||
expectedStatus: nil,
|
||||
expectedLocation: filepath.Join(
|
||||
kubelet.getCheckpointsDir(),
|
||||
fmt.Sprintf(
|
||||
"checkpoint-%s_%s-%s",
|
||||
fakePod.Pod.Name,
|
||||
fakePod.Pod.Namespace,
|
||||
fakePod.Pod.Containers[0].Name,
|
||||
),
|
||||
),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
options := &runtimeapi.CheckpointContainerRequest{}
|
||||
if test.checkpointLocation != "" {
|
||||
options.Location = test.checkpointLocation
|
||||
}
|
||||
status := kubelet.CheckpointContainer(
|
||||
fakePod.Pod.ID,
|
||||
fmt.Sprintf(
|
||||
"%s_%s",
|
||||
fakePod.Pod.Name,
|
||||
fakePod.Pod.Namespace,
|
||||
),
|
||||
test.containerName,
|
||||
options,
|
||||
)
|
||||
require.Equal(t, status, test.expectedStatus)
|
||||
|
||||
if status != nil {
|
||||
return
|
||||
}
|
||||
|
||||
require.True(
|
||||
t,
|
||||
strings.HasPrefix(
|
||||
options.Location,
|
||||
test.expectedLocation,
|
||||
),
|
||||
)
|
||||
require.Equal(
|
||||
t,
|
||||
options.ContainerId,
|
||||
containerID.ID,
|
||||
)
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
|
||||
defer testKubelet.Cleanup()
|
||||
|
@ -324,3 +324,12 @@ func (in instrumentedImageManagerService) ImageFsInfo() ([]*runtimeapi.Filesyste
|
||||
recordError(operation, err)
|
||||
return fsInfo, nil
|
||||
}
|
||||
|
||||
func (in instrumentedRuntimeService) CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error {
|
||||
const operation = "checkpoint_container"
|
||||
defer recordOperation(operation, time.Now())
|
||||
|
||||
err := in.service.CheckpointContainer(options)
|
||||
recordError(operation, err)
|
||||
return err
|
||||
}
|
||||
|
@ -1100,3 +1100,7 @@ func (m *kubeGenericRuntimeManager) UpdatePodCIDR(podCIDR string) error {
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (m *kubeGenericRuntimeManager) CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error {
|
||||
return m.runtimeService.CheckpointContainer(options)
|
||||
}
|
||||
|
@ -110,6 +110,7 @@ func AuthzTestCases() []AuthzTestCase {
|
||||
testPaths := map[string]string{
|
||||
"/attach/{podNamespace}/{podID}/{containerName}": "proxy",
|
||||
"/attach/{podNamespace}/{podID}/{uid}/{containerName}": "proxy",
|
||||
"/checkpoint/{podNamespace}/{podID}/{containerName}": "proxy",
|
||||
"/configz": "proxy",
|
||||
"/containerLogs/{podNamespace}/{podID}/{containerName}": "proxy",
|
||||
"/debug/flags/v": "proxy",
|
||||
|
@ -62,6 +62,7 @@ import (
|
||||
"k8s.io/component-base/logs"
|
||||
compbasemetrics "k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
|
||||
podresourcesapiv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
@ -224,6 +225,7 @@ type HostInterface interface {
|
||||
GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
|
||||
GetRunningPods() ([]*v1.Pod, error)
|
||||
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
|
||||
CheckpointContainer(podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error
|
||||
GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
|
||||
ServeLogs(w http.ResponseWriter, req *http.Request)
|
||||
ResyncInterval() time.Duration
|
||||
@ -403,6 +405,17 @@ func (s *Server) InstallDefaultHandlers() {
|
||||
s.restfulCont.Handle(proberMetricsPath,
|
||||
compbasemetrics.HandlerFor(p, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
|
||||
)
|
||||
|
||||
// Only enable checkpoint API if the feature is enabled
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ContainerCheckpoint) {
|
||||
s.addMetricsBucketMatcher("checkpoint")
|
||||
ws = &restful.WebService{}
|
||||
ws.Path("/checkpoint").Produces(restful.MIME_JSON)
|
||||
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
|
||||
To(s.checkpoint).
|
||||
Operation("checkpoint"))
|
||||
s.restfulCont.Add(ws)
|
||||
}
|
||||
}
|
||||
|
||||
// InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
|
||||
@ -878,6 +891,83 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp
|
||||
proxyStream(response.ResponseWriter, request.Request, url)
|
||||
}
|
||||
|
||||
// checkpoint handles the checkpoint API request. It checks if the requested
|
||||
// podNamespace, pod and container actually exist and only then calls out
|
||||
// to the runtime to actually checkpoint the container.
|
||||
func (s *Server) checkpoint(request *restful.Request, response *restful.Response) {
|
||||
pod, ok := s.host.GetPodByName(request.PathParameter("podNamespace"), request.PathParameter("podID"))
|
||||
if !ok {
|
||||
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
|
||||
return
|
||||
}
|
||||
|
||||
containerName := request.PathParameter("containerName")
|
||||
|
||||
found := false
|
||||
for _, container := range pod.Spec.Containers {
|
||||
if container.Name == containerName {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
for _, container := range pod.Spec.InitContainers {
|
||||
if container.Name == containerName {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found && utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
|
||||
for _, container := range pod.Spec.EphemeralContainers {
|
||||
if container.Name == containerName {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
response.WriteError(
|
||||
http.StatusNotFound,
|
||||
fmt.Errorf("container %v does not exist", containerName),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
options := &runtimeapi.CheckpointContainerRequest{}
|
||||
// Query parameter to select an optional timeout. Without the timeout parameter
|
||||
// the checkpoint command will use the default CRI timeout.
|
||||
timeouts := request.Request.URL.Query()["timeout"]
|
||||
if len(timeouts) > 0 {
|
||||
// If the user specified one or multiple values for timeouts we
|
||||
// are using the last available value.
|
||||
timeout, err := strconv.ParseInt(timeouts[len(timeouts)-1], 10, 64)
|
||||
if err != nil {
|
||||
response.WriteError(
|
||||
http.StatusNotFound,
|
||||
fmt.Errorf("cannot parse value of timeout parameter"),
|
||||
)
|
||||
return
|
||||
}
|
||||
options.Timeout = timeout
|
||||
}
|
||||
|
||||
if err := s.host.CheckpointContainer(pod.UID, kubecontainer.GetPodFullName(pod), containerName, options); err != nil {
|
||||
response.WriteError(
|
||||
http.StatusInternalServerError,
|
||||
fmt.Errorf(
|
||||
"checkpointing of %v/%v/%v failed (%v)",
|
||||
request.PathParameter("podNamespace"),
|
||||
request.PathParameter("podID"),
|
||||
containerName,
|
||||
err,
|
||||
),
|
||||
)
|
||||
return
|
||||
}
|
||||
writeJSONResponse(
|
||||
response,
|
||||
[]byte(fmt.Sprintf("{\"items\":[\"%s\"]}", options.Location)),
|
||||
)
|
||||
}
|
||||
|
||||
// getURLRootPath trims a URL path.
|
||||
// For paths in the format of "/metrics/xxx", "metrics/xxx" is returned;
|
||||
// For all other paths, the first part of the path is returned.
|
||||
|
@ -53,7 +53,10 @@ import (
|
||||
"k8s.io/utils/pointer"
|
||||
|
||||
// Do some initialization to decode the query parameters correctly.
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
_ "k8s.io/kubernetes/pkg/apis/core/install"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cri/streaming"
|
||||
@ -146,6 +149,13 @@ func (fk *fakeKubelet) RunInContainer(podFullName string, uid types.UID, contain
|
||||
return fk.runFunc(podFullName, uid, containerName, cmd)
|
||||
}
|
||||
|
||||
func (fk *fakeKubelet) CheckpointContainer(podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error {
|
||||
if containerName == "checkpointingFailure" {
|
||||
return fmt.Errorf("Returning error for test")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type fakeRuntime struct {
|
||||
execFunc func(string, []string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
|
||||
attachFunc func(string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
|
||||
@ -348,7 +358,8 @@ func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletCo
|
||||
fw.fakeKubelet,
|
||||
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}),
|
||||
fw.fakeAuth,
|
||||
kubeCfg)
|
||||
kubeCfg,
|
||||
)
|
||||
fw.serverUnderTest = &server
|
||||
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
|
||||
return fw
|
||||
@ -459,7 +470,6 @@ func TestServeRunInContainerWithUID(t *testing.T) {
|
||||
}
|
||||
|
||||
resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+testUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Got error POSTing: %v", err)
|
||||
}
|
||||
@ -546,6 +556,9 @@ func TestAuthzCoverage(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAuthFilters(t *testing.T) {
|
||||
// Enable features.ContainerCheckpoint during test
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ContainerCheckpoint, true)()
|
||||
|
||||
fw := newServerTest()
|
||||
defer fw.testHTTPServer.Close()
|
||||
|
||||
@ -840,6 +853,78 @@ func TestContainerLogsWithInvalidTail(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckpointContainer(t *testing.T) {
|
||||
// Enable features.ContainerCheckpoint during test
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ContainerCheckpoint, true)()
|
||||
|
||||
fw := newServerTest()
|
||||
defer fw.testHTTPServer.Close()
|
||||
podNamespace := "other"
|
||||
podName := "foo"
|
||||
expectedContainerName := "baz"
|
||||
// GetPodByName() should always fail
|
||||
fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) {
|
||||
return nil, false
|
||||
}
|
||||
t.Run("wrong pod namespace", func(t *testing.T) {
|
||||
resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/"+expectedContainerName, "", nil)
|
||||
if err != nil {
|
||||
t.Errorf("Got error POSTing: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusNotFound {
|
||||
t.Errorf("Unexpected non-error checkpointing container: %#v", resp)
|
||||
}
|
||||
})
|
||||
// let GetPodByName() return a result, but our container "wrongContainerName" is not part of the Pod
|
||||
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
|
||||
t.Run("wrong container name", func(t *testing.T) {
|
||||
resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/wrongContainerName", "", nil)
|
||||
if err != nil {
|
||||
t.Errorf("Got error POSTing: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusNotFound {
|
||||
t.Errorf("Unexpected non-error checkpointing container: %#v", resp)
|
||||
}
|
||||
})
|
||||
// Now the checkpointing of the container fails
|
||||
fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: podNamespace,
|
||||
Name: podName,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "checkpointingFailure",
|
||||
},
|
||||
},
|
||||
},
|
||||
}, true
|
||||
}
|
||||
t.Run("checkpointing fails", func(t *testing.T) {
|
||||
resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/checkpointingFailure", "", nil)
|
||||
if err != nil {
|
||||
t.Errorf("Got error POSTing: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
assert.Equal(t, resp.StatusCode, 500)
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
assert.Equal(t, string(body), "checkpointing of other/foo/checkpointingFailure failed (Returning error for test)")
|
||||
})
|
||||
// Now test a successful checkpoint succeeds
|
||||
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
|
||||
t.Run("checkpointing succeeds", func(t *testing.T) {
|
||||
resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/"+expectedContainerName, "", nil)
|
||||
if err != nil {
|
||||
t.Errorf("Got error POSTing: %v", err)
|
||||
}
|
||||
assert.Equal(t, resp.StatusCode, 200)
|
||||
})
|
||||
}
|
||||
|
||||
func makeReq(t *testing.T, method, url, clientProtocol string) *http.Request {
|
||||
req, err := http.NewRequest(method, url, nil)
|
||||
if err != nil {
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -114,6 +114,9 @@ service RuntimeService {
|
||||
|
||||
// Status returns the status of the runtime.
|
||||
rpc Status(StatusRequest) returns (StatusResponse) {}
|
||||
|
||||
// CheckpointContainer checkpoints a container
|
||||
rpc CheckpointContainer(CheckpointContainerRequest) returns (CheckpointContainerResponse) {}
|
||||
}
|
||||
|
||||
// ImageService defines the public APIs for managing images.
|
||||
@ -1543,3 +1546,16 @@ message ReopenContainerLogRequest {
|
||||
|
||||
message ReopenContainerLogResponse{
|
||||
}
|
||||
|
||||
message CheckpointContainerRequest {
|
||||
// ID of the container to be checkpointed.
|
||||
string container_id = 1;
|
||||
// Location of the checkpoint archive used for export
|
||||
string location = 2;
|
||||
// Timeout in seconds for the checkpoint to complete.
|
||||
// Timeout of zero means to use the CRI default.
|
||||
// Timeout > 0 means to use the user specified timeout.
|
||||
int64 timeout = 3;
|
||||
}
|
||||
|
||||
message CheckpointContainerResponse {}
|
||||
|
@ -56,6 +56,8 @@ type ContainerManager interface {
|
||||
// for the container. If it returns error, new container log file MUST NOT
|
||||
// be created.
|
||||
ReopenContainerLog(ContainerID string) error
|
||||
// CheckpointContainer checkpoints a container
|
||||
CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error
|
||||
}
|
||||
|
||||
// PodSandboxManager contains methods for operating on PodSandboxes. The methods
|
||||
|
@ -694,3 +694,17 @@ func (r *FakeRuntimeService) ReopenContainerLog(containerID string) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CheckpointContainer emulates call to checkpoint a container in the FakeRuntimeService.
|
||||
func (r *FakeRuntimeService) CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
r.Called = append(r.Called, "CheckpointContainer")
|
||||
|
||||
if err := r.popError("CheckpointContainer"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
155
test/e2e_node/checkpoint_container.go
Normal file
155
test/e2e_node/checkpoint_container.go
Normal file
@ -0,0 +1,155 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2enode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
testutils "k8s.io/kubernetes/test/utils"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
admissionapi "k8s.io/pod-security-admission/api"
|
||||
)
|
||||
|
||||
const (
|
||||
// timeout for proxy requests.
|
||||
proxyTimeout = 2 * time.Minute
|
||||
)
|
||||
|
||||
// proxyPostRequest performs a post on a node proxy endpoint given the nodename and rest client.
|
||||
func proxyPostRequest(c clientset.Interface, node, endpoint string, port int) (restclient.Result, error) {
|
||||
// proxy tends to hang in some cases when Node is not ready. Add an artificial timeout for this call. #22165
|
||||
var result restclient.Result
|
||||
finished := make(chan struct{}, 1)
|
||||
go func() {
|
||||
result = c.CoreV1().RESTClient().Post().
|
||||
Resource("nodes").
|
||||
SubResource("proxy").
|
||||
Name(fmt.Sprintf("%v:%v", node, port)).
|
||||
Suffix(endpoint).
|
||||
Do(context.TODO())
|
||||
|
||||
finished <- struct{}{}
|
||||
}()
|
||||
select {
|
||||
case <-finished:
|
||||
return result, nil
|
||||
case <-time.After(proxyTimeout):
|
||||
return restclient.Result{}, nil
|
||||
}
|
||||
}
|
||||
|
||||
var _ = SIGDescribe("Checkpoint Container [NodeFeature:CheckpointContainer]", func() {
|
||||
f := framework.NewDefaultFramework("checkpoint-container-test")
|
||||
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelBaseline
|
||||
ginkgo.It("will checkpoint a container out of a pod", func() {
|
||||
ginkgo.By("creating a target pod")
|
||||
podClient := f.PodClient()
|
||||
pod := podClient.CreateSync(&v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "checkpoint-container-pod"},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "test-container-1",
|
||||
Image: imageutils.GetE2EImage(imageutils.BusyBox),
|
||||
Command: []string{"/bin/sleep"},
|
||||
Args: []string{"10000"},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
p, err := podClient.Get(
|
||||
context.TODO(),
|
||||
pod.Name,
|
||||
metav1.GetOptions{},
|
||||
)
|
||||
|
||||
framework.ExpectNoError(err)
|
||||
isReady, err := testutils.PodRunningReady(p)
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectEqual(
|
||||
isReady,
|
||||
true,
|
||||
"pod should be ready",
|
||||
)
|
||||
|
||||
framework.Logf(
|
||||
"About to checkpoint container %q on %q",
|
||||
pod.Spec.Containers[0].Name,
|
||||
pod.Spec.NodeName,
|
||||
)
|
||||
result, err := proxyPostRequest(
|
||||
f.ClientSet,
|
||||
pod.Spec.NodeName,
|
||||
fmt.Sprintf(
|
||||
"checkpoint/%s/%s/%s",
|
||||
f.Namespace.Name,
|
||||
pod.Name,
|
||||
pod.Spec.Containers[0].Name,
|
||||
),
|
||||
framework.KubeletPort,
|
||||
)
|
||||
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
err = result.Error()
|
||||
if err != nil {
|
||||
statusError, ok := err.(*apierrors.StatusError)
|
||||
if !ok {
|
||||
framework.Failf("got error %#v, expected StatusError", err)
|
||||
}
|
||||
// If we are testing against a kubelet with ContainerCheckpoint == false
|
||||
// we should get a 404. So a 404 is (also) a good sign.
|
||||
if (int(statusError.ErrStatus.Code)) == http.StatusNotFound {
|
||||
ginkgo.Skip("Feature 'ContainerCheckpoint' is not enabled and not available")
|
||||
return
|
||||
}
|
||||
|
||||
// If the container engine has not implemented the Checkpoint CRI API
|
||||
// we will get 500 and a message with
|
||||
// '(rpc error: code = Unimplemented desc = unknown method CheckpointContainer'
|
||||
if (int(statusError.ErrStatus.Code)) == http.StatusInternalServerError {
|
||||
if strings.Contains(
|
||||
statusError.ErrStatus.Message,
|
||||
"(rpc error: code = Unimplemented desc = unknown method CheckpointContainer",
|
||||
) {
|
||||
ginkgo.Skip("Container engine does not implement 'CheckpointContainer'")
|
||||
return
|
||||
}
|
||||
}
|
||||
framework.Failf("Unexpected status code (%d) during 'CheckpointContainer'", statusError.ErrStatus.Code)
|
||||
}
|
||||
|
||||
framework.ExpectNoError(err)
|
||||
// TODO: once a container engine implements the Checkpoint CRI API this needs
|
||||
// to be extended to handle it.
|
||||
//
|
||||
// Checkpointing actually worked. Verify that the checkpoint exists and that
|
||||
// it is a checkpoint.
|
||||
})
|
||||
})
|
Loading…
Reference in New Issue
Block a user