mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 21:17:23 +00:00
Merge pull request #127495 from zhifei92/criproxy-for-e2enode
Add cri proxy for e2e_node
This commit is contained in:
commit
d6e7aa0f18
@ -363,6 +363,14 @@ var (
|
||||
// (used for testing fine-grained SupplementalGroups control <https://kep.k8s.io/3619>)
|
||||
SupplementalGroupsPolicy = framework.WithFeature(framework.ValidFeatures.Add("SupplementalGroupsPolicy"))
|
||||
|
||||
// Owner: sig-node
|
||||
// Tests marked with this feature MUST run with the CRI Proxy configured so errors can be injected into the kubelet's CRI calls.
|
||||
// This is useful for testing how the kubelet handles various error conditions in its CRI interactions.
|
||||
// test-infra jobs:
|
||||
// - pull-kubernetes-node-e2e-cri-proxy-serial (need manual trigger)
|
||||
// - ci-kubernetes-node-e2e-cri-proxy-serial
|
||||
CriProxy = framework.WithFeature(framework.ValidFeatures.Add("CriProxy"))
|
||||
|
||||
// Owner: sig-network
|
||||
// Marks tests that require a cluster with Topology Hints enabled.
|
||||
TopologyHints = framework.WithFeature(framework.ValidFeatures.Add("Topology Hints"))
|
||||
|
@ -277,6 +277,8 @@ type NodeTestContextType struct {
|
||||
ExtraEnvs map[string]string
|
||||
// StandaloneMode indicates whether the test is running kubelet in a standalone mode.
|
||||
StandaloneMode bool
|
||||
// CriProxyEnabled indicates whether enable CRI API proxy for failure injection.
|
||||
CriProxyEnabled bool
|
||||
}
|
||||
|
||||
// CloudConfig holds the cloud configuration for e2e test suites.
|
||||
|
33
test/e2e_node/criproxy/endpoint.go
Normal file
33
test/e2e_node/criproxy/endpoint.go
Normal file
@ -0,0 +1,33 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package criproxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultUnixEndpoint = "unix:///tmp/kubelet_remote_proxy_%v.sock"
|
||||
)
|
||||
|
||||
// GenerateEndpoint generates a new unix socket server of grpc server.
|
||||
func GenerateEndpoint() (string, error) {
|
||||
// use random int be a part fo file name
|
||||
return fmt.Sprintf(defaultUnixEndpoint, rand.Int()), nil
|
||||
}
|
104
test/e2e_node/criproxy/proxy_image_service.go
Normal file
104
test/e2e_node/criproxy/proxy_image_service.go
Normal file
@ -0,0 +1,104 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package criproxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
kubeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
ListImages = "ListImages"
|
||||
ImageStatus = "ImageStatus"
|
||||
PullImage = "PullImage"
|
||||
RemoveImage = "RemoveImage"
|
||||
ImageFsInfo = "ImageFsInfo"
|
||||
)
|
||||
|
||||
// ListImages lists existing images.
|
||||
func (p *RemoteRuntime) ListImages(ctx context.Context, req *kubeapi.ListImagesRequest) (*kubeapi.ListImagesResponse, error) {
|
||||
if err := p.runInjectors(ListImages); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
images, err := p.imageService.ListImages(ctx, req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &kubeapi.ListImagesResponse{
|
||||
Images: images,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ImageStatus returns the status of the image. If the image is not
|
||||
// present, returns a response with ImageStatusResponse.Image set to
|
||||
// nil.
|
||||
func (p *RemoteRuntime) ImageStatus(ctx context.Context, req *kubeapi.ImageStatusRequest) (*kubeapi.ImageStatusResponse, error) {
|
||||
if err := p.runInjectors(ImageStatus); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := p.imageService.ImageStatus(ctx, req.Image, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// PullImage pulls an image with authentication config.
|
||||
func (p *RemoteRuntime) PullImage(ctx context.Context, req *kubeapi.PullImageRequest) (*kubeapi.PullImageResponse, error) {
|
||||
if err := p.runInjectors(PullImage); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
image, err := p.imageService.PullImage(ctx, req.Image, req.Auth, req.SandboxConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &kubeapi.PullImageResponse{
|
||||
ImageRef: image,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// RemoveImage removes the image.
|
||||
// This call is idempotent, and must not return an error if the image has
|
||||
// already been removed.
|
||||
func (p *RemoteRuntime) RemoveImage(ctx context.Context, req *kubeapi.RemoveImageRequest) (*kubeapi.RemoveImageResponse, error) {
|
||||
if err := p.runInjectors(RemoveImage); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := p.imageService.RemoveImage(ctx, req.Image)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &kubeapi.RemoveImageResponse{}, nil
|
||||
}
|
||||
|
||||
// ImageFsInfo returns information of the filesystem that is used to store images.
|
||||
func (p *RemoteRuntime) ImageFsInfo(ctx context.Context, req *kubeapi.ImageFsInfoRequest) (*kubeapi.ImageFsInfoResponse, error) {
|
||||
if err := p.runInjectors(ImageFsInfo); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := p.imageService.ImageFsInfo(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
523
test/e2e_node/criproxy/proxy_runtime_service.go
Normal file
523
test/e2e_node/criproxy/proxy_runtime_service.go
Normal file
@ -0,0 +1,523 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package criproxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
internalapi "k8s.io/cri-api/pkg/apis"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
"k8s.io/cri-client/pkg/util"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
utilexec "k8s.io/utils/exec"
|
||||
)
|
||||
|
||||
const (
|
||||
Version = "Version"
|
||||
RunPodSandbox = "RunPodSandbox"
|
||||
StopPodSandbox = "StopPodSandbox"
|
||||
RemovePodSandbox = "RemovePodSandbox"
|
||||
PodSandboxStatus = "PodSandboxStatus"
|
||||
ListPodSandbox = "ListPodSandbox"
|
||||
CreateContainer = "CreateContainer"
|
||||
StartContainer = "StartContainer"
|
||||
StopContainer = "StopContainer"
|
||||
RemoveContainer = "RemoveContainer"
|
||||
ListContainers = "ListContainers"
|
||||
ContainerStatus = "ContainerStatus"
|
||||
UpdateContainerResources = "UpdateContainerResources"
|
||||
ReopenContainerLog = "ReopenContainerLog"
|
||||
ExecSync = "ExecSync"
|
||||
Exec = "Exec"
|
||||
Attach = "Attach"
|
||||
PortForward = "PortForward"
|
||||
ContainerStats = "ContainerStats"
|
||||
ListContainerStats = "ListContainerStats"
|
||||
PodSandboxStats = "PodSandboxStats"
|
||||
ListPodSandboxStats = "ListPodSandboxStats"
|
||||
UpdateRuntimeConfig = "UpdateRuntimeConfig"
|
||||
Status = "Status"
|
||||
CheckpointContainer = "CheckpointContainer"
|
||||
GetContainerEvents = "GetContainerEvents"
|
||||
ListMetricDescriptors = "ListMetricDescriptors"
|
||||
ListPodSandboxMetrics = "ListPodSandboxMetrics"
|
||||
RuntimeConfig = "RuntimeConfig"
|
||||
)
|
||||
|
||||
// AddInjector inject the error or delay to the next call to the RuntimeService.
|
||||
func (p *RemoteRuntime) AddInjector(injector func(string) error) {
|
||||
p.injectors = append(p.injectors, injector)
|
||||
}
|
||||
|
||||
// ResetInjectors resets all registered injectors.
|
||||
func (p *RemoteRuntime) ResetInjectors() {
|
||||
p.injectors = []func(string) error{}
|
||||
}
|
||||
|
||||
func (p *RemoteRuntime) runInjectors(apiName string) error {
|
||||
for _, injector := range p.injectors {
|
||||
if err := injector(apiName); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoteRuntime represents a proxy for remote container runtime.
|
||||
type RemoteRuntime struct {
|
||||
server *grpc.Server
|
||||
injectors []func(string) error
|
||||
runtimeService internalapi.RuntimeService
|
||||
imageService internalapi.ImageManagerService
|
||||
}
|
||||
|
||||
// NewRemoteRuntimeProxy creates a new RemoteRuntime.
|
||||
func NewRemoteRuntimeProxy(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService) *RemoteRuntime {
|
||||
p := &RemoteRuntime{
|
||||
server: grpc.NewServer(),
|
||||
runtimeService: runtimeService,
|
||||
imageService: imageService,
|
||||
}
|
||||
runtimeapi.RegisterRuntimeServiceServer(p.server, p)
|
||||
runtimeapi.RegisterImageServiceServer(p.server, p)
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// Start starts the remote runtime proxy.
|
||||
func (p *RemoteRuntime) Start(endpoint string) error {
|
||||
l, err := util.CreateListener(endpoint)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to listen on %q: %w", endpoint, err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := p.server.Serve(l); err != nil {
|
||||
framework.Failf("Failed to start cri proxy : %v", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops the fake remote runtime proxy.
|
||||
func (p *RemoteRuntime) Stop() {
|
||||
p.server.Stop()
|
||||
}
|
||||
|
||||
// Version returns the runtime name, runtime version, and runtime API version.
|
||||
func (p *RemoteRuntime) Version(ctx context.Context, req *runtimeapi.VersionRequest) (*runtimeapi.VersionResponse, error) {
|
||||
if err := p.runInjectors(Version); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.runtimeService.Version(ctx, req.Version)
|
||||
}
|
||||
|
||||
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure
|
||||
// the sandbox is in the ready state on success.
|
||||
func (p *RemoteRuntime) RunPodSandbox(ctx context.Context, req *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
|
||||
if err := p.runInjectors(RunPodSandbox); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sandboxID, err := p.runtimeService.RunPodSandbox(ctx, req.Config, req.RuntimeHandler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.RunPodSandboxResponse{PodSandboxId: sandboxID}, nil
|
||||
}
|
||||
|
||||
// StopPodSandbox stops any running process that is part of the sandbox and
|
||||
// reclaims network resources (e.g., IP addresses) allocated to the sandbox.
|
||||
// If there are any running containers in the sandbox, they must be forcibly
|
||||
// terminated.
|
||||
func (p *RemoteRuntime) StopPodSandbox(ctx context.Context, req *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) {
|
||||
if err := p.runInjectors(StopPodSandbox); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := p.runtimeService.StopPodSandbox(ctx, req.PodSandboxId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.StopPodSandboxResponse{}, nil
|
||||
}
|
||||
|
||||
// RemovePodSandbox removes the sandbox. If there are any running containers
|
||||
// in the sandbox, they must be forcibly terminated and removed.
|
||||
// This call is idempotent, and must not return an error if the sandbox has
|
||||
// already been removed.
|
||||
func (p *RemoteRuntime) RemovePodSandbox(ctx context.Context, req *runtimeapi.RemovePodSandboxRequest) (*runtimeapi.RemovePodSandboxResponse, error) {
|
||||
if err := p.runInjectors(RemovePodSandbox); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err := p.runtimeService.RemovePodSandbox(ctx, req.PodSandboxId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &runtimeapi.RemovePodSandboxResponse{}, nil
|
||||
}
|
||||
|
||||
// PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not
|
||||
// present, returns an error.
|
||||
func (p *RemoteRuntime) PodSandboxStatus(ctx context.Context, req *runtimeapi.PodSandboxStatusRequest) (*runtimeapi.PodSandboxStatusResponse, error) {
|
||||
if err := p.runInjectors(PodSandboxStatus); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := p.runtimeService.PodSandboxStatus(ctx, req.PodSandboxId, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// ListPodSandbox returns a list of PodSandboxes.
|
||||
func (p *RemoteRuntime) ListPodSandbox(ctx context.Context, req *runtimeapi.ListPodSandboxRequest) (*runtimeapi.ListPodSandboxResponse, error) {
|
||||
if err := p.runInjectors(ListPodSandbox); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
items, err := p.runtimeService.ListPodSandbox(ctx, req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.ListPodSandboxResponse{Items: items}, nil
|
||||
}
|
||||
|
||||
// CreateContainer creates a new container in specified PodSandbox
|
||||
func (p *RemoteRuntime) CreateContainer(ctx context.Context, req *runtimeapi.CreateContainerRequest) (*runtimeapi.CreateContainerResponse, error) {
|
||||
if err := p.runInjectors(CreateContainer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
containerID, err := p.runtimeService.CreateContainer(ctx, req.PodSandboxId, req.Config, req.SandboxConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.CreateContainerResponse{ContainerId: containerID}, nil
|
||||
}
|
||||
|
||||
// StartContainer starts the container.
|
||||
func (p *RemoteRuntime) StartContainer(ctx context.Context, req *runtimeapi.StartContainerRequest) (*runtimeapi.StartContainerResponse, error) {
|
||||
if err := p.runInjectors(StartContainer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := p.runtimeService.StartContainer(ctx, req.ContainerId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.StartContainerResponse{}, nil
|
||||
}
|
||||
|
||||
// StopContainer stops a running container with a grace period (i.e., timeout).
|
||||
// This call is idempotent, and must not return an error if the container has
|
||||
// already been stopped.
|
||||
func (p *RemoteRuntime) StopContainer(ctx context.Context, req *runtimeapi.StopContainerRequest) (*runtimeapi.StopContainerResponse, error) {
|
||||
if err := p.runInjectors(StopContainer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := p.runtimeService.StopContainer(ctx, req.ContainerId, req.Timeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.StopContainerResponse{}, nil
|
||||
}
|
||||
|
||||
// RemoveContainer removes the container. If the container is running, the
|
||||
// container must be forcibly removed.
|
||||
// This call is idempotent, and must not return an error if the container has
|
||||
// already been removed.
|
||||
func (p *RemoteRuntime) RemoveContainer(ctx context.Context, req *runtimeapi.RemoveContainerRequest) (*runtimeapi.RemoveContainerResponse, error) {
|
||||
if err := p.runInjectors(RemoveContainer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := p.runtimeService.RemoveContainer(ctx, req.ContainerId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.RemoveContainerResponse{}, nil
|
||||
}
|
||||
|
||||
// ListContainers lists all containers by filters.
|
||||
func (p *RemoteRuntime) ListContainers(ctx context.Context, req *runtimeapi.ListContainersRequest) (*runtimeapi.ListContainersResponse, error) {
|
||||
if err := p.runInjectors(ListContainers); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
items, err := p.runtimeService.ListContainers(ctx, req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.ListContainersResponse{Containers: items}, nil
|
||||
}
|
||||
|
||||
// ContainerStatus returns status of the container. If the container is not
|
||||
// present, returns an error.
|
||||
func (p *RemoteRuntime) ContainerStatus(ctx context.Context, req *runtimeapi.ContainerStatusRequest) (*runtimeapi.ContainerStatusResponse, error) {
|
||||
if err := p.runInjectors(ContainerStatus); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := p.runtimeService.ContainerStatus(ctx, req.ContainerId, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// ExecSync runs a command in a container synchronously.
|
||||
func (p *RemoteRuntime) ExecSync(ctx context.Context, req *runtimeapi.ExecSyncRequest) (*runtimeapi.ExecSyncResponse, error) {
|
||||
if err := p.runInjectors(ExecSync); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var exitCode int32
|
||||
stdout, stderr, err := p.runtimeService.ExecSync(ctx, req.ContainerId, req.Cmd, time.Duration(req.Timeout)*time.Second)
|
||||
if err != nil {
|
||||
var exitError utilexec.ExitError
|
||||
ok := errors.As(err, &exitError)
|
||||
if !ok {
|
||||
return nil, err
|
||||
}
|
||||
exitCode = int32(exitError.ExitStatus())
|
||||
}
|
||||
return &runtimeapi.ExecSyncResponse{
|
||||
Stdout: stdout,
|
||||
Stderr: stderr,
|
||||
ExitCode: exitCode,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Exec prepares a streaming endpoint to execute a command in the container.
|
||||
func (p *RemoteRuntime) Exec(ctx context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
|
||||
if err := p.runInjectors(Exec); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return p.runtimeService.Exec(ctx, req)
|
||||
}
|
||||
|
||||
// Attach prepares a streaming endpoint to attach to a running container.
|
||||
func (p *RemoteRuntime) Attach(ctx context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
|
||||
if err := p.runInjectors(Attach); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return p.runtimeService.Attach(ctx, req)
|
||||
}
|
||||
|
||||
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox.
|
||||
func (p *RemoteRuntime) PortForward(ctx context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
|
||||
if err := p.runInjectors(PortForward); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return p.runtimeService.PortForward(ctx, req)
|
||||
}
|
||||
|
||||
// ContainerStats returns stats of the container. If the container does not
|
||||
// exist, the call returns an error.
|
||||
func (p *RemoteRuntime) ContainerStats(ctx context.Context, req *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) {
|
||||
if err := p.runInjectors(ContainerStats); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stats, err := p.runtimeService.ContainerStats(ctx, req.ContainerId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.ContainerStatsResponse{Stats: stats}, nil
|
||||
}
|
||||
|
||||
// ListContainerStats returns stats of all running containers.
|
||||
func (p *RemoteRuntime) ListContainerStats(ctx context.Context, req *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) {
|
||||
if err := p.runInjectors(ListContainerStats); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stats, err := p.runtimeService.ListContainerStats(ctx, req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.ListContainerStatsResponse{Stats: stats}, nil
|
||||
}
|
||||
|
||||
// PodSandboxStats returns stats of the pod. If the pod does not
|
||||
// exist, the call returns an error.
|
||||
func (p *RemoteRuntime) PodSandboxStats(ctx context.Context, req *runtimeapi.PodSandboxStatsRequest) (*runtimeapi.PodSandboxStatsResponse, error) {
|
||||
if err := p.runInjectors(PodSandboxStats); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stats, err := p.runtimeService.PodSandboxStats(ctx, req.PodSandboxId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.PodSandboxStatsResponse{Stats: stats}, nil
|
||||
}
|
||||
|
||||
// ListPodSandboxStats returns stats of all running pods.
|
||||
func (p *RemoteRuntime) ListPodSandboxStats(ctx context.Context, req *runtimeapi.ListPodSandboxStatsRequest) (*runtimeapi.ListPodSandboxStatsResponse, error) {
|
||||
if err := p.runInjectors(ListPodSandboxStats); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stats, err := p.runtimeService.ListPodSandboxStats(ctx, req.Filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.ListPodSandboxStatsResponse{Stats: stats}, nil
|
||||
}
|
||||
|
||||
// UpdateRuntimeConfig updates the runtime configuration based on the given request.
|
||||
func (p *RemoteRuntime) UpdateRuntimeConfig(ctx context.Context, req *runtimeapi.UpdateRuntimeConfigRequest) (*runtimeapi.UpdateRuntimeConfigResponse, error) {
|
||||
if err := p.runInjectors(UpdateRuntimeConfig); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := p.runtimeService.UpdateRuntimeConfig(ctx, req.RuntimeConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.UpdateRuntimeConfigResponse{}, nil
|
||||
}
|
||||
|
||||
// Status returns the status of the runtime.
|
||||
func (p *RemoteRuntime) Status(ctx context.Context, req *runtimeapi.StatusRequest) (*runtimeapi.StatusResponse, error) {
|
||||
if err := p.runInjectors(Status); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := p.runtimeService.Status(ctx, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// UpdateContainerResources updates ContainerConfig of the container.
|
||||
func (p *RemoteRuntime) UpdateContainerResources(ctx context.Context, req *runtimeapi.UpdateContainerResourcesRequest) (*runtimeapi.UpdateContainerResourcesResponse, error) {
|
||||
if err := p.runInjectors(UpdateContainerResources); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := p.runtimeService.UpdateContainerResources(ctx, req.ContainerId, &runtimeapi.ContainerResources{Linux: req.Linux})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.UpdateContainerResourcesResponse{}, nil
|
||||
}
|
||||
|
||||
// ReopenContainerLog reopens the container log file.
|
||||
func (p *RemoteRuntime) ReopenContainerLog(ctx context.Context, req *runtimeapi.ReopenContainerLogRequest) (*runtimeapi.ReopenContainerLogResponse, error) {
|
||||
if err := p.runInjectors(ReopenContainerLog); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := p.runtimeService.ReopenContainerLog(ctx, req.ContainerId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.ReopenContainerLogResponse{}, nil
|
||||
}
|
||||
|
||||
// CheckpointContainer checkpoints the given container.
|
||||
func (p *RemoteRuntime) CheckpointContainer(ctx context.Context, req *runtimeapi.CheckpointContainerRequest) (*runtimeapi.CheckpointContainerResponse, error) {
|
||||
if err := p.runInjectors(CheckpointContainer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := p.runtimeService.CheckpointContainer(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.CheckpointContainerResponse{}, nil
|
||||
}
|
||||
|
||||
func (p *RemoteRuntime) GetContainerEvents(req *runtimeapi.GetEventsRequest, ces runtimeapi.RuntimeService_GetContainerEventsServer) error {
|
||||
if err := p.runInjectors(GetContainerEvents); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Capacity of the channel for receiving pod lifecycle events. This number
|
||||
// is a bit arbitrary and may be adjusted in the future.
|
||||
plegChannelCapacity := 1000
|
||||
containerEventsResponseCh := make(chan *runtimeapi.ContainerEventResponse, plegChannelCapacity)
|
||||
defer close(containerEventsResponseCh)
|
||||
|
||||
if err := p.runtimeService.GetContainerEvents(context.Background(), containerEventsResponseCh, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for event := range containerEventsResponseCh {
|
||||
if err := ces.Send(event); err != nil {
|
||||
return status.Errorf(codes.Unknown, "Failed to send event: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListMetricDescriptors gets the descriptors for the metrics that will be returned in ListPodSandboxMetrics.
|
||||
func (p *RemoteRuntime) ListMetricDescriptors(ctx context.Context, req *runtimeapi.ListMetricDescriptorsRequest) (*runtimeapi.ListMetricDescriptorsResponse, error) {
|
||||
if err := p.runInjectors(ListMetricDescriptors); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
descs, err := p.runtimeService.ListMetricDescriptors(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.ListMetricDescriptorsResponse{Descriptors: descs}, nil
|
||||
}
|
||||
|
||||
// ListPodSandboxMetrics retrieves the metrics for all pod sandboxes.
|
||||
func (p *RemoteRuntime) ListPodSandboxMetrics(ctx context.Context, req *runtimeapi.ListPodSandboxMetricsRequest) (*runtimeapi.ListPodSandboxMetricsResponse, error) {
|
||||
if err := p.runInjectors(ListPodSandboxMetrics); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
podMetrics, err := p.runtimeService.ListPodSandboxMetrics(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &runtimeapi.ListPodSandboxMetricsResponse{PodMetrics: podMetrics}, nil
|
||||
}
|
||||
|
||||
// RuntimeConfig returns the configuration information of the runtime.
|
||||
func (p *RemoteRuntime) RuntimeConfig(ctx context.Context, req *runtimeapi.RuntimeConfigRequest) (*runtimeapi.RuntimeConfigResponse, error) {
|
||||
if err := p.runInjectors(RuntimeConfig); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := p.runtimeService.RuntimeConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
172
test/e2e_node/criproxy_test.go
Normal file
172
test/e2e_node/criproxy_test.go
Normal file
@ -0,0 +1,172 @@
|
||||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2enode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
"github.com/onsi/gomega"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"
|
||||
"k8s.io/kubernetes/test/e2e/feature"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
"k8s.io/kubernetes/test/e2e_node/criproxy"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
admissionapi "k8s.io/pod-security-admission/api"
|
||||
)
|
||||
|
||||
// Examples of using CRI proxy
|
||||
var _ = SIGDescribe(feature.CriProxy, framework.WithSerial(), func() {
|
||||
f := framework.NewDefaultFramework("cri-proxy-example")
|
||||
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
|
||||
|
||||
ginkgo.Context("Inject a pull image error exception into the CriProxy", func() {
|
||||
ginkgo.BeforeEach(func() {
|
||||
if err := resetCRIProxyInjector(); err != nil {
|
||||
ginkgo.Skip("Skip the test since the CRI Proxy is undefined.")
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.AfterEach(func() {
|
||||
err := resetCRIProxyInjector()
|
||||
framework.ExpectNoError(err)
|
||||
})
|
||||
|
||||
ginkgo.It("Pod failed to start due to an image pull error.", func(ctx context.Context) {
|
||||
expectedErr := fmt.Errorf("PullImage failed")
|
||||
err := addCRIProxyInjector(func(apiName string) error {
|
||||
if apiName == criproxy.PullImage {
|
||||
return expectedErr
|
||||
}
|
||||
return nil
|
||||
})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
pod := e2epod.NewPodClient(f).Create(ctx, newPullImageAlwaysPod())
|
||||
podErr := e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)
|
||||
gomega.Expect(podErr).To(gomega.HaveOccurred())
|
||||
|
||||
eventMsg, err := getFailedToPullImageMsg(ctx, f, pod.Name)
|
||||
framework.ExpectNoError(err)
|
||||
isExpectedErrMsg := strings.Contains(eventMsg, expectedErr.Error())
|
||||
gomega.Expect(isExpectedErrMsg).To(gomega.BeTrueBecause("we injected an exception into the PullImage interface of the cri proxy"))
|
||||
})
|
||||
})
|
||||
|
||||
ginkgo.Context("Inject a pull image timeout exception into the CriProxy", func() {
|
||||
ginkgo.BeforeEach(func() {
|
||||
if err := resetCRIProxyInjector(); err != nil {
|
||||
ginkgo.Skip("Skip the test since the CRI Proxy is undefined.")
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.AfterEach(func() {
|
||||
err := resetCRIProxyInjector()
|
||||
framework.ExpectNoError(err)
|
||||
})
|
||||
|
||||
ginkgo.It("Image pull time exceeded 10 seconds", func(ctx context.Context) {
|
||||
const delayTime = 10 * time.Second
|
||||
err := addCRIProxyInjector(func(apiName string) error {
|
||||
if apiName == criproxy.PullImage {
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
pod := e2epod.NewPodClient(f).Create(ctx, newPullImageAlwaysPod())
|
||||
podErr := e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)
|
||||
framework.ExpectNoError(podErr)
|
||||
|
||||
imagePullDuration, err := getPodImagePullDuration(ctx, f, pod.Name)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
gomega.Expect(imagePullDuration).To(gomega.BeNumerically(">=", delayTime), "PullImages should take more than 10 seconds")
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
func getFailedToPullImageMsg(ctx context.Context, f *framework.Framework, podName string) (string, error) {
|
||||
events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
for _, event := range events.Items {
|
||||
if event.Reason == kubeletevents.FailedToPullImage && event.InvolvedObject.Name == podName {
|
||||
return event.Message, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("failed to find FailedToPullImage event for pod: %s", podName)
|
||||
}
|
||||
|
||||
func getPodImagePullDuration(ctx context.Context, f *framework.Framework, podName string) (time.Duration, error) {
|
||||
events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var startTime, endTime time.Time
|
||||
for _, event := range events.Items {
|
||||
if event.InvolvedObject.Name == podName {
|
||||
switch event.Reason {
|
||||
case kubeletevents.PullingImage:
|
||||
startTime = event.FirstTimestamp.Time
|
||||
case kubeletevents.PulledImage:
|
||||
endTime = event.FirstTimestamp.Time
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if startTime.IsZero() || endTime.IsZero() {
|
||||
return 0, fmt.Errorf("failed to find both PullingImage and PulledImage events for pod: %s", podName)
|
||||
}
|
||||
|
||||
return endTime.Sub(startTime), nil
|
||||
}
|
||||
|
||||
func newPullImageAlwaysPod() *v1.Pod {
|
||||
podName := "cri-proxy-test-" + string(uuid.NewUUID())
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Image: imageutils.GetPauseImageName(),
|
||||
Name: podName,
|
||||
ImagePullPolicy: v1.PullAlways,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return pod
|
||||
}
|
@ -48,6 +48,7 @@ import (
|
||||
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
|
||||
e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
|
||||
e2etestingmanifests "k8s.io/kubernetes/test/e2e/testing-manifests"
|
||||
"k8s.io/kubernetes/test/e2e_node/criproxy"
|
||||
"k8s.io/kubernetes/test/e2e_node/services"
|
||||
e2enodetestingmanifests "k8s.io/kubernetes/test/e2e_node/testing-manifests"
|
||||
system "k8s.io/system-validators/validators"
|
||||
@ -69,6 +70,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
e2eCriProxy *criproxy.RemoteRuntime
|
||||
e2es *services.E2EServices
|
||||
// featureGates is a map of feature names to bools that enable or disable alpha/experimental features.
|
||||
featureGates map[string]bool
|
||||
@ -109,6 +111,7 @@ func registerNodeFlags(flags *flag.FlagSet) {
|
||||
flags.Var(cliflag.NewMapStringBool(&featureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features.")
|
||||
flags.Var(cliflag.NewMapStringBool(&serviceFeatureGates), "service-feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features for API service.")
|
||||
flags.BoolVar(&framework.TestContext.StandaloneMode, "standalone-mode", false, "If true, starts kubelet in standalone mode.")
|
||||
flags.BoolVar(&framework.TestContext.CriProxyEnabled, "cri-proxy-enabled", false, "If true, enable CRI API proxy for failure injection.")
|
||||
}
|
||||
|
||||
func init() {
|
||||
@ -242,6 +245,22 @@ var _ = ginkgo.SynchronizedBeforeSuite(func(ctx context.Context) []byte {
|
||||
// We should mask locksmithd when provisioning the machine.
|
||||
maskLocksmithdOnCoreos()
|
||||
|
||||
if framework.TestContext.CriProxyEnabled {
|
||||
framework.Logf("Start cri proxy")
|
||||
rs, is, err := getCRIClient()
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
e2eCriProxy = criproxy.NewRemoteRuntimeProxy(rs, is)
|
||||
endpoint, err := criproxy.GenerateEndpoint()
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
err = e2eCriProxy.Start(endpoint)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
framework.TestContext.ContainerRuntimeEndpoint = endpoint
|
||||
framework.TestContext.ImageServiceEndpoint = endpoint
|
||||
}
|
||||
|
||||
if *startServices {
|
||||
// If the services are expected to stop after test, they should monitor the test process.
|
||||
// If the services are expected to keep running after test, they should not monitor the test process.
|
||||
@ -285,6 +304,11 @@ var _ = ginkgo.SynchronizedAfterSuite(func() {}, func() {
|
||||
}
|
||||
}
|
||||
|
||||
if e2eCriProxy != nil {
|
||||
framework.Logf("Stopping cri proxy service...")
|
||||
e2eCriProxy.Stop()
|
||||
}
|
||||
|
||||
klog.Infof("Tests Finished")
|
||||
})
|
||||
|
||||
|
@ -20,6 +20,8 @@ limitations under the License.
|
||||
package e2enode
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
libcontainercgroups "github.com/opencontainers/runc/libcontainer/cgroups"
|
||||
)
|
||||
|
||||
@ -27,3 +29,21 @@ import (
|
||||
func IsCgroup2UnifiedMode() bool {
|
||||
return libcontainercgroups.IsCgroup2UnifiedMode()
|
||||
}
|
||||
|
||||
// addCRIProxyInjector registers an injector function for the CRIProxy.
|
||||
func addCRIProxyInjector(injector func(apiName string) error) error {
|
||||
if e2eCriProxy != nil {
|
||||
e2eCriProxy.AddInjector(injector)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("failed to add injector because the CRI Proxy is undefined")
|
||||
}
|
||||
|
||||
// resetCRIProxyInjector resets all injector functions for the CRIProxy.
|
||||
func resetCRIProxyInjector() error {
|
||||
if e2eCriProxy != nil {
|
||||
e2eCriProxy.ResetInjectors()
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("failed to reset injector because the CRI Proxy is undefined")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user