Merge pull request #125663 from saschagrunert/oci-volumesource-kubelet

[KEP-4639] Add `ImageVolumeSource` implementation
This commit is contained in:
Kubernetes Prow Robot 2024-07-22 15:48:33 -07:00 committed by GitHub
commit 581a073dc4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 563 additions and 76 deletions

View File

@ -22,6 +22,7 @@ import (
"k8s.io/utils/exec" "k8s.io/utils/exec"
// Volume plugins // Volume plugins
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/configmap" "k8s.io/kubernetes/pkg/volume/configmap"
"k8s.io/kubernetes/pkg/volume/csi" "k8s.io/kubernetes/pkg/volume/csi"
@ -31,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/volume/flexvolume" "k8s.io/kubernetes/pkg/volume/flexvolume"
"k8s.io/kubernetes/pkg/volume/git_repo" "k8s.io/kubernetes/pkg/volume/git_repo"
"k8s.io/kubernetes/pkg/volume/hostpath" "k8s.io/kubernetes/pkg/volume/hostpath"
"k8s.io/kubernetes/pkg/volume/image"
"k8s.io/kubernetes/pkg/volume/iscsi" "k8s.io/kubernetes/pkg/volume/iscsi"
"k8s.io/kubernetes/pkg/volume/local" "k8s.io/kubernetes/pkg/volume/local"
"k8s.io/kubernetes/pkg/volume/nfs" "k8s.io/kubernetes/pkg/volume/nfs"
@ -65,6 +67,9 @@ func ProbeVolumePlugins(featureGate featuregate.FeatureGate) ([]volume.VolumePlu
allPlugins = append(allPlugins, projected.ProbeVolumePlugins()...) allPlugins = append(allPlugins, projected.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, local.ProbeVolumePlugins()...) allPlugins = append(allPlugins, local.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...) allPlugins = append(allPlugins, csi.ProbeVolumePlugins()...)
if featureGate.Enabled(features.ImageVolume) {
allPlugins = append(allPlugins, image.ProbeVolumePlugins()...)
}
return allPlugins, nil return allPlugins, nil
} }

View File

@ -46,7 +46,7 @@ type HandlerRunner interface {
// RuntimeHelper wraps kubelet to make container runtime // RuntimeHelper wraps kubelet to make container runtime
// able to get necessary informations like the RunContainerOptions, DNS settings, Host IP. // able to get necessary informations like the RunContainerOptions, DNS settings, Host IP.
type RuntimeHelper interface { type RuntimeHelper interface {
GenerateRunContainerOptions(ctx context.Context, pod *v1.Pod, container *v1.Container, podIP string, podIPs []string) (contOpts *RunContainerOptions, cleanupAction func(), err error) GenerateRunContainerOptions(ctx context.Context, pod *v1.Pod, container *v1.Container, podIP string, podIPs []string, imageVolumes ImageVolumes) (contOpts *RunContainerOptions, cleanupAction func(), err error)
GetPodDNS(pod *v1.Pod) (dnsConfig *runtimeapi.DNSConfig, err error) GetPodDNS(pod *v1.Pod) (dnsConfig *runtimeapi.DNSConfig, err error)
// GetPodCgroupParent returns the CgroupName identifier, and its literal cgroupfs form on the host // GetPodCgroupParent returns the CgroupName identifier, and its literal cgroupfs form on the host
// of a pod. // of a pod.

View File

@ -135,6 +135,8 @@ type Runtime interface {
ListMetricDescriptors(ctx context.Context) ([]*runtimeapi.MetricDescriptor, error) ListMetricDescriptors(ctx context.Context) ([]*runtimeapi.MetricDescriptor, error)
// ListPodSandboxMetrics retrieves the metrics for all pod sandboxes. // ListPodSandboxMetrics retrieves the metrics for all pod sandboxes.
ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error) ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error)
// GetContainerStatus returns the status for the container.
GetContainerStatus(ctx context.Context, id ContainerID) (*Status, error)
} }
// StreamingRuntime is the interface implemented by runtimes that handle the serving of the // StreamingRuntime is the interface implemented by runtimes that handle the serving of the
@ -374,6 +376,8 @@ type Status struct {
Resources *ContainerResources Resources *ContainerResources
// User identity information of the first process of this container // User identity information of the first process of this container
User *ContainerUser User *ContainerUser
// Mounts are the volume mounts of the container
Mounts []Mount
} }
// ContainerUser represents user identity information // ContainerUser represents user identity information
@ -466,8 +470,13 @@ type Mount struct {
SELinuxRelabel bool SELinuxRelabel bool
// Requested propagation mode // Requested propagation mode
Propagation runtimeapi.MountPropagation Propagation runtimeapi.MountPropagation
// Image is set if an OCI volume as image ID or digest should get mounted (special case).
Image *runtimeapi.ImageSpec
} }
// ImageVolumes is a map of image specs by volume name.
type ImageVolumes = map[string]*runtimeapi.ImageSpec
// PortMapping contains information about the port mapping. // PortMapping contains information about the port mapping.
type PortMapping struct { type PortMapping struct {
// Protocol of the port mapping. // Protocol of the port mapping.

View File

@ -516,3 +516,11 @@ func (f *FakeContainerCommandRunner) RunInContainer(_ context.Context, container
return []byte(f.Stdout), f.Err return []byte(f.Stdout), f.Err
} }
func (f *FakeRuntime) GetContainerStatus(_ context.Context, _ kubecontainer.ContainerID) (status *kubecontainer.Status, err error) {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "GetContainerStatus")
return nil, f.Err
}

View File

@ -36,7 +36,7 @@ type FakeRuntimeHelper struct {
Err error Err error
} }
func (f *FakeRuntimeHelper) GenerateRunContainerOptions(_ context.Context, pod *v1.Pod, container *v1.Container, podIP string, podIPs []string) (*kubecontainer.RunContainerOptions, func(), error) { func (f *FakeRuntimeHelper) GenerateRunContainerOptions(_ context.Context, pod *v1.Pod, container *v1.Container, podIP string, podIPs []string, imageVolumes kubecontainer.ImageVolumes) (*kubecontainer.RunContainerOptions, func(), error) {
var opts kubecontainer.RunContainerOptions var opts kubecontainer.RunContainerOptions
if len(container.TerminationMessagePath) != 0 { if len(container.TerminationMessagePath) != 0 {
opts.PodContainerDir = f.PodContainerDir opts.PodContainerDir = f.PodContainerDir

View File

@ -14,6 +14,22 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by mockery v2.40.3. DO NOT EDIT. // Code generated by mockery v2.40.3. DO NOT EDIT.
package testing package testing
@ -358,6 +374,65 @@ func (_c *MockRuntime_GetContainerLogs_Call) RunAndReturn(run func(context.Conte
return _c return _c
} }
// GetContainerStatus provides a mock function with given fields: ctx, id
func (_m *MockRuntime) GetContainerStatus(ctx context.Context, id container.ContainerID) (*container.Status, error) {
ret := _m.Called(ctx, id)
if len(ret) == 0 {
panic("no return value specified for GetContainerStatus")
}
var r0 *container.Status
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, container.ContainerID) (*container.Status, error)); ok {
return rf(ctx, id)
}
if rf, ok := ret.Get(0).(func(context.Context, container.ContainerID) *container.Status); ok {
r0 = rf(ctx, id)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*container.Status)
}
}
if rf, ok := ret.Get(1).(func(context.Context, container.ContainerID) error); ok {
r1 = rf(ctx, id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockRuntime_GetContainerStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetContainerStatus'
type MockRuntime_GetContainerStatus_Call struct {
*mock.Call
}
// GetContainerStatus is a helper method to define mock.On call
// - ctx context.Context
// - id container.ContainerID
func (_e *MockRuntime_Expecter) GetContainerStatus(ctx interface{}, id interface{}) *MockRuntime_GetContainerStatus_Call {
return &MockRuntime_GetContainerStatus_Call{Call: _e.mock.On("GetContainerStatus", ctx, id)}
}
func (_c *MockRuntime_GetContainerStatus_Call) Run(run func(ctx context.Context, id container.ContainerID)) *MockRuntime_GetContainerStatus_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(container.ContainerID))
})
return _c
}
func (_c *MockRuntime_GetContainerStatus_Call) Return(_a0 *container.Status, _a1 error) *MockRuntime_GetContainerStatus_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockRuntime_GetContainerStatus_Call) RunAndReturn(run func(context.Context, container.ContainerID) (*container.Status, error)) *MockRuntime_GetContainerStatus_Call {
_c.Call.Return(run)
return _c
}
// GetImageRef provides a mock function with given fields: ctx, image // GetImageRef provides a mock function with given fields: ctx, image
func (_m *MockRuntime) GetImageRef(ctx context.Context, image container.ImageSpec) (string, error) { func (_m *MockRuntime) GetImageRef(ctx context.Context, image container.ImageSpec) (string, error) {
ret := _m.Called(ctx, image) ret := _m.Called(ctx, image)

View File

@ -248,6 +248,10 @@ func (im *realImageGCManager) detectImages(ctx context.Context, detectTime time.
// Make a set of images in use by containers. // Make a set of images in use by containers.
for _, pod := range pods { for _, pod := range pods {
for _, container := range pod.Containers { for _, container := range pod.Containers {
if err := im.handleImageVolumes(ctx, imagesInUse, container, pod, images); err != nil {
return imagesInUse, err
}
if !isRuntimeClassInImageCriAPIEnabled { if !isRuntimeClassInImageCriAPIEnabled {
klog.V(5).InfoS("Container uses image", "pod", klog.KRef(pod.Namespace, pod.Name), "containerName", container.Name, "containerImage", container.Image, "imageID", container.ImageID, "imageRef", container.ImageRef) klog.V(5).InfoS("Container uses image", "pod", klog.KRef(pod.Namespace, pod.Name), "containerName", container.Name, "containerImage", container.Image, "imageID", container.ImageID, "imageRef", container.ImageRef)
imagesInUse.Insert(container.ImageID) imagesInUse.Insert(container.ImageID)
@ -308,6 +312,29 @@ func (im *realImageGCManager) detectImages(ctx context.Context, detectTime time.
return imagesInUse, nil return imagesInUse, nil
} }
// handleImageVolumes ensures that image volumes are considered as images in use.
func (im *realImageGCManager) handleImageVolumes(ctx context.Context, imagesInUse sets.Set[string], container *container.Container, pod *container.Pod, images []container.Image) error {
if !utilfeature.DefaultFeatureGate.Enabled(features.ImageVolume) {
return nil
}
status, err := im.runtime.GetContainerStatus(ctx, container.ID)
if err != nil {
return fmt.Errorf("get container status: %w", err)
}
for _, mount := range status.Mounts {
for _, image := range images {
if mount.Image != nil && mount.Image.Image == image.ID {
klog.V(5).InfoS("Container uses image as mount", "pod", klog.KRef(pod.Namespace, pod.Name), "containerName", container.Name, "imageID", image.ID)
imagesInUse.Insert(image.ID)
}
}
}
return nil
}
func (im *realImageGCManager) GarbageCollect(ctx context.Context, beganGC time.Time) error { func (im *realImageGCManager) GarbageCollect(ctx context.Context, beganGC time.Time) error {
ctx, otelSpan := im.tracer.Start(ctx, "Images/GarbageCollect") ctx, otelSpan := im.tracer.Start(ctx, "Images/GarbageCollect")
defer otelSpan.End() defer otelSpan.End()

View File

@ -75,13 +75,13 @@ func NewImageManager(recorder record.EventRecorder, imageService kubecontainer.I
// shouldPullImage returns whether we should pull an image according to // shouldPullImage returns whether we should pull an image according to
// the presence and pull policy of the image. // the presence and pull policy of the image.
func shouldPullImage(container *v1.Container, imagePresent bool) bool { func shouldPullImage(pullPolicy v1.PullPolicy, imagePresent bool) bool {
if container.ImagePullPolicy == v1.PullNever { if pullPolicy == v1.PullNever {
return false return false
} }
if container.ImagePullPolicy == v1.PullAlways || if pullPolicy == v1.PullAlways ||
(container.ImagePullPolicy == v1.PullIfNotPresent && (!imagePresent)) { (pullPolicy == v1.PullIfNotPresent && (!imagePresent)) {
return true return true
} }
@ -89,28 +89,24 @@ func shouldPullImage(container *v1.Container, imagePresent bool) bool {
} }
// records an event using ref, event msg. log to glog using prefix, msg, logFn // records an event using ref, event msg. log to glog using prefix, msg, logFn
func (m *imageManager) logIt(ref *v1.ObjectReference, eventtype, event, prefix, msg string, logFn func(args ...interface{})) { func (m *imageManager) logIt(objRef *v1.ObjectReference, eventtype, event, prefix, msg string, logFn func(args ...interface{})) {
if ref != nil { if objRef != nil {
m.recorder.Event(ref, eventtype, event, msg) m.recorder.Event(objRef, eventtype, event, msg)
} else { } else {
logFn(fmt.Sprint(prefix, " ", msg)) logFn(fmt.Sprint(prefix, " ", msg))
} }
} }
// EnsureImageExists pulls the image for the specified pod and container, and returns // EnsureImageExists pulls the image for the specified pod and imgRef, and returns
// (imageRef, error message, error). // (imageRef, error message, error).
func (m *imageManager) EnsureImageExists(ctx context.Context, pod *v1.Pod, container *v1.Container, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, podRuntimeHandler string) (string, string, error) { func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectReference, pod *v1.Pod, imgRef string, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, podRuntimeHandler string, pullPolicy v1.PullPolicy) (imageRef, message string, err error) {
logPrefix := fmt.Sprintf("%s/%s/%s", pod.Namespace, pod.Name, container.Image) logPrefix := fmt.Sprintf("%s/%s/%s", pod.Namespace, pod.Name, imgRef)
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod), "containerName", container.Name)
}
// If the image contains no tag or digest, a default tag should be applied. // If the image contains no tag or digest, a default tag should be applied.
image, err := applyDefaultImageTag(container.Image) image, err := applyDefaultImageTag(imgRef)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed to apply default image tag %q: %v", container.Image, err) msg := fmt.Sprintf("Failed to apply default image tag %q: %v", imgRef, err)
m.logIt(ref, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning) m.logIt(objRef, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning)
return "", msg, ErrInvalidImageName return "", msg, ErrInvalidImageName
} }
@ -128,60 +124,60 @@ func (m *imageManager) EnsureImageExists(ctx context.Context, pod *v1.Pod, conta
RuntimeHandler: podRuntimeHandler, RuntimeHandler: podRuntimeHandler,
} }
imageRef, err := m.imageService.GetImageRef(ctx, spec) imageRef, err = m.imageService.GetImageRef(ctx, spec)
if err != nil { if err != nil {
msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) msg := fmt.Sprintf("Failed to inspect image %q: %v", imgRef, err)
m.logIt(ref, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning) m.logIt(objRef, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning)
return "", msg, ErrImageInspect return "", msg, ErrImageInspect
} }
present := imageRef != "" present := imageRef != ""
if !shouldPullImage(container, present) { if !shouldPullImage(pullPolicy, present) {
if present { if present {
msg := fmt.Sprintf("Container image %q already present on machine", container.Image) msg := fmt.Sprintf("Container image %q already present on machine", imgRef)
m.logIt(ref, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info) m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info)
return imageRef, "", nil return imageRef, "", nil
} }
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image) msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", imgRef)
m.logIt(ref, v1.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, klog.Warning) m.logIt(objRef, v1.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, klog.Warning)
return "", msg, ErrImageNeverPull return "", msg, ErrImageNeverPull
} }
backOffKey := fmt.Sprintf("%s_%s", pod.UID, container.Image) backOffKey := fmt.Sprintf("%s_%s", pod.UID, imgRef)
if m.backOff.IsInBackOffSinceUpdate(backOffKey, m.backOff.Clock.Now()) { if m.backOff.IsInBackOffSinceUpdate(backOffKey, m.backOff.Clock.Now()) {
msg := fmt.Sprintf("Back-off pulling image %q", container.Image) msg := fmt.Sprintf("Back-off pulling image %q", imgRef)
m.logIt(ref, v1.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, klog.Info) m.logIt(objRef, v1.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, klog.Info)
return "", msg, ErrImagePullBackOff return "", msg, ErrImagePullBackOff
} }
m.podPullingTimeRecorder.RecordImageStartedPulling(pod.UID) m.podPullingTimeRecorder.RecordImageStartedPulling(pod.UID)
m.logIt(ref, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("Pulling image %q", container.Image), klog.Info) m.logIt(objRef, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("Pulling image %q", imgRef), klog.Info)
startTime := time.Now() startTime := time.Now()
pullChan := make(chan pullResult) pullChan := make(chan pullResult)
m.puller.pullImage(ctx, spec, pullSecrets, pullChan, podSandboxConfig) m.puller.pullImage(ctx, spec, pullSecrets, pullChan, podSandboxConfig)
imagePullResult := <-pullChan imagePullResult := <-pullChan
if imagePullResult.err != nil { if imagePullResult.err != nil {
m.logIt(ref, v1.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, imagePullResult.err), klog.Warning) m.logIt(objRef, v1.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", imgRef, imagePullResult.err), klog.Warning)
m.backOff.Next(backOffKey, m.backOff.Clock.Now()) m.backOff.Next(backOffKey, m.backOff.Clock.Now())
msg, err := evalCRIPullErr(container, imagePullResult.err) msg, err := evalCRIPullErr(imgRef, imagePullResult.err)
return "", msg, err return "", msg, err
} }
m.podPullingTimeRecorder.RecordImageFinishedPulling(pod.UID) m.podPullingTimeRecorder.RecordImageFinishedPulling(pod.UID)
imagePullDuration := time.Since(startTime).Truncate(time.Millisecond) imagePullDuration := time.Since(startTime).Truncate(time.Millisecond)
m.logIt(ref, v1.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q in %v (%v including waiting). Image size: %v bytes.", m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q in %v (%v including waiting). Image size: %v bytes.",
container.Image, imagePullResult.pullDuration.Truncate(time.Millisecond), imagePullDuration, imagePullResult.imageSize), klog.Info) imgRef, imagePullResult.pullDuration.Truncate(time.Millisecond), imagePullDuration, imagePullResult.imageSize), klog.Info)
metrics.ImagePullDuration.WithLabelValues(metrics.GetImageSizeBucket(imagePullResult.imageSize)).Observe(imagePullDuration.Seconds()) metrics.ImagePullDuration.WithLabelValues(metrics.GetImageSizeBucket(imagePullResult.imageSize)).Observe(imagePullDuration.Seconds())
m.backOff.GC() m.backOff.GC()
return imagePullResult.imageRef, "", nil return imagePullResult.imageRef, "", nil
} }
func evalCRIPullErr(container *v1.Container, err error) (errMsg string, errRes error) { func evalCRIPullErr(imgRef string, err error) (errMsg string, errRes error) {
// Error assertions via errors.Is is not supported by gRPC (remote runtime) errors right now. // Error assertions via errors.Is is not supported by gRPC (remote runtime) errors right now.
// See https://github.com/grpc/grpc-go/issues/3616 // See https://github.com/grpc/grpc-go/issues/3616
if strings.HasPrefix(err.Error(), crierrors.ErrRegistryUnavailable.Error()) { if strings.HasPrefix(err.Error(), crierrors.ErrRegistryUnavailable.Error()) {
errMsg = fmt.Sprintf( errMsg = fmt.Sprintf(
"image pull failed for %s because the registry is unavailable%s", "image pull failed for %s because the registry is unavailable%s",
container.Image, imgRef,
// Trim the error name from the message to convert errors like: // Trim the error name from the message to convert errors like:
// "RegistryUnavailable: a more detailed explanation" to: // "RegistryUnavailable: a more detailed explanation" to:
// "...because the registry is unavailable: a more detailed explanation" // "...because the registry is unavailable: a more detailed explanation"
@ -193,7 +189,7 @@ func evalCRIPullErr(container *v1.Container, err error) (errMsg string, errRes e
if strings.HasPrefix(err.Error(), crierrors.ErrSignatureValidationFailed.Error()) { if strings.HasPrefix(err.Error(), crierrors.ErrSignatureValidationFailed.Error()) {
errMsg = fmt.Sprintf( errMsg = fmt.Sprintf(
"image pull failed for %s because the signature validation failed%s", "image pull failed for %s because the signature validation failed%s",
container.Image, imgRef,
// Trim the error name from the message to convert errors like: // Trim the error name from the message to convert errors like:
// "SignatureValidationFailed: a more detailed explanation" to: // "SignatureValidationFailed: a more detailed explanation" to:
// "...because the signature validation failed: a more detailed explanation" // "...because the signature validation failed: a more detailed explanation"

View File

@ -272,7 +272,7 @@ func TestParallelPuller(t *testing.T) {
fakeRuntime.CalledFunctions = nil fakeRuntime.CalledFunctions = nil
fakeClock.Step(time.Second) fakeClock.Step(time.Second)
_, _, err := puller.EnsureImageExists(ctx, pod, container, nil, nil, "") _, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, nil, nil, "", container.ImagePullPolicy)
fakeRuntime.AssertCalls(expected.calls) fakeRuntime.AssertCalls(expected.calls)
assert.Equal(t, expected.err, err) assert.Equal(t, expected.err, err)
assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded) assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded)
@ -304,7 +304,7 @@ func TestSerializedPuller(t *testing.T) {
fakeRuntime.CalledFunctions = nil fakeRuntime.CalledFunctions = nil
fakeClock.Step(time.Second) fakeClock.Step(time.Second)
_, _, err := puller.EnsureImageExists(ctx, pod, container, nil, nil, "") _, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, nil, nil, "", container.ImagePullPolicy)
fakeRuntime.AssertCalls(expected.calls) fakeRuntime.AssertCalls(expected.calls)
assert.Equal(t, expected.err, err) assert.Equal(t, expected.err, err)
assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded) assert.Equal(t, expected.shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded)
@ -367,7 +367,7 @@ func TestPullAndListImageWithPodAnnotations(t *testing.T) {
fakeRuntime.ImageList = []Image{} fakeRuntime.ImageList = []Image{}
fakeClock.Step(time.Second) fakeClock.Step(time.Second)
_, _, err := puller.EnsureImageExists(ctx, pod, container, nil, nil, "") _, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, nil, nil, "", container.ImagePullPolicy)
fakeRuntime.AssertCalls(c.expected[0].calls) fakeRuntime.AssertCalls(c.expected[0].calls)
assert.Equal(t, c.expected[0].err, err, "tick=%d", 0) assert.Equal(t, c.expected[0].err, err, "tick=%d", 0)
assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded) assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded)
@ -424,7 +424,7 @@ func TestPullAndListImageWithRuntimeHandlerInImageCriAPIFeatureGate(t *testing.T
fakeRuntime.ImageList = []Image{} fakeRuntime.ImageList = []Image{}
fakeClock.Step(time.Second) fakeClock.Step(time.Second)
_, _, err := puller.EnsureImageExists(ctx, pod, container, nil, nil, runtimeHandler) _, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, nil, nil, runtimeHandler, container.ImagePullPolicy)
fakeRuntime.AssertCalls(c.expected[0].calls) fakeRuntime.AssertCalls(c.expected[0].calls)
assert.Equal(t, c.expected[0].err, err, "tick=%d", 0) assert.Equal(t, c.expected[0].err, err, "tick=%d", 0)
assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded) assert.Equal(t, c.expected[0].shouldRecordStartedPullingTime, fakePodPullingTimeRecorder.startedPullingRecorded)
@ -483,7 +483,7 @@ func TestMaxParallelImagePullsLimit(t *testing.T) {
for i := 0; i < maxParallelImagePulls; i++ { for i := 0; i < maxParallelImagePulls; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
_, _, err := puller.EnsureImageExists(ctx, pod, container, nil, nil, "") _, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, nil, nil, "", container.ImagePullPolicy)
assert.Nil(t, err) assert.Nil(t, err)
wg.Done() wg.Done()
}() }()
@ -495,7 +495,7 @@ func TestMaxParallelImagePullsLimit(t *testing.T) {
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
_, _, err := puller.EnsureImageExists(ctx, pod, container, nil, nil, "") _, _, err := puller.EnsureImageExists(ctx, nil, pod, container.Image, nil, nil, "", container.ImagePullPolicy)
assert.Nil(t, err) assert.Nil(t, err)
wg.Done() wg.Done()
}() }()
@ -568,7 +568,7 @@ func TestEvalCRIPullErr(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
t.Parallel() t.Parallel()
msg, err := evalCRIPullErr(&v1.Container{Image: "test"}, testInput) msg, err := evalCRIPullErr("test", testInput)
testAssert(msg, err) testAssert(msg, err)
}) })
} }

View File

@ -47,8 +47,8 @@ var (
// Implementations are expected to abstract the underlying runtimes. // Implementations are expected to abstract the underlying runtimes.
// Implementations are expected to be thread safe. // Implementations are expected to be thread safe.
type ImageManager interface { type ImageManager interface {
// EnsureImageExists ensures that image specified in `container` exists. // EnsureImageExists ensures that image specified by `imgRef` exists.
EnsureImageExists(ctx context.Context, pod *v1.Pod, container *v1.Container, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, podRuntimeHandler string) (string, string, error) EnsureImageExists(ctx context.Context, objRef *v1.ObjectReference, pod *v1.Pod, imgRef string, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, podRuntimeHandler string, pullPolicy v1.PullPolicy) (string, string, error)
// TODO(ronl): consolidating image managing and deleting operation in this interface // TODO(ronl): consolidating image managing and deleting operation in this interface
} }

View File

@ -256,12 +256,24 @@ func shouldMountHostsFile(pod *v1.Pod, podIPs []string) bool {
} }
// makeMounts determines the mount points for the given container. // makeMounts determines the mount points for the given container.
func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, hostDomain string, podIPs []string, podVolumes kubecontainer.VolumeMap, hu hostutil.HostUtils, subpather subpath.Interface, expandEnvs []kubecontainer.EnvVar, supportsRRO bool) ([]kubecontainer.Mount, func(), error) { func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, hostDomain string, podIPs []string, podVolumes kubecontainer.VolumeMap, hu hostutil.HostUtils, subpather subpath.Interface, expandEnvs []kubecontainer.EnvVar, supportsRRO bool, imageVolumes kubecontainer.ImageVolumes) ([]kubecontainer.Mount, func(), error) {
mountEtcHostsFile := shouldMountHostsFile(pod, podIPs) mountEtcHostsFile := shouldMountHostsFile(pod, podIPs)
klog.V(3).InfoS("Creating hosts mount for container", "pod", klog.KObj(pod), "containerName", container.Name, "podIPs", podIPs, "path", mountEtcHostsFile) klog.V(3).InfoS("Creating hosts mount for container", "pod", klog.KObj(pod), "containerName", container.Name, "podIPs", podIPs, "path", mountEtcHostsFile)
mounts := []kubecontainer.Mount{} mounts := []kubecontainer.Mount{}
var cleanupAction func() var cleanupAction func()
for i, mount := range container.VolumeMounts { for i, mount := range container.VolumeMounts {
// Check if the mount is referencing an OCI volume
if imageVolumes != nil && utilfeature.DefaultFeatureGate.Enabled(features.ImageVolume) {
if image, ok := imageVolumes[mount.Name]; ok {
mounts = append(mounts, kubecontainer.Mount{
Name: mount.Name,
ContainerPath: mount.MountPath,
Image: image,
})
continue
}
}
// do not mount /etc/hosts if container is already mounting on the path // do not mount /etc/hosts if container is already mounting on the path
mountEtcHostsFile = mountEtcHostsFile && (mount.MountPath != etcHostsPath) mountEtcHostsFile = mountEtcHostsFile && (mount.MountPath != etcHostsPath)
vol, ok := podVolumes[mount.Name] vol, ok := podVolumes[mount.Name]
@ -576,7 +588,7 @@ func (kl *Kubelet) GetPodCgroupParent(pod *v1.Pod) string {
// GenerateRunContainerOptions generates the RunContainerOptions, which can be used by // GenerateRunContainerOptions generates the RunContainerOptions, which can be used by
// the container runtime to set parameters for launching a container. // the container runtime to set parameters for launching a container.
func (kl *Kubelet) GenerateRunContainerOptions(ctx context.Context, pod *v1.Pod, container *v1.Container, podIP string, podIPs []string) (*kubecontainer.RunContainerOptions, func(), error) { func (kl *Kubelet) GenerateRunContainerOptions(ctx context.Context, pod *v1.Pod, container *v1.Container, podIP string, podIPs []string, imageVolumes kubecontainer.ImageVolumes) (*kubecontainer.RunContainerOptions, func(), error) {
supportsRRO := kl.runtimeClassSupportsRecursiveReadOnlyMounts(pod) supportsRRO := kl.runtimeClassSupportsRecursiveReadOnlyMounts(pod)
opts, err := kl.containerManager.GetResources(pod, container) opts, err := kl.containerManager.GetResources(pod, container)
@ -606,7 +618,7 @@ func (kl *Kubelet) GenerateRunContainerOptions(ctx context.Context, pod *v1.Pod,
opts.Envs = append(opts.Envs, envs...) opts.Envs = append(opts.Envs, envs...)
// only podIPs is sent to makeMounts, as podIPs is populated even if dual-stack feature flag is not enabled. // only podIPs is sent to makeMounts, as podIPs is populated even if dual-stack feature flag is not enabled.
mounts, cleanupAction, err := makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIPs, volumes, kl.hostutil, kl.subpather, opts.Envs, supportsRRO) mounts, cleanupAction, err := makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIPs, volumes, kl.hostutil, kl.subpather, opts.Envs, supportsRRO, imageVolumes)
if err != nil { if err != nil {
return nil, cleanupAction, err return nil, cleanupAction, err
} }

View File

@ -251,7 +251,7 @@ func TestMakeMounts(t *testing.T) {
}, },
} }
mounts, _, err := makeMounts(&pod, "/pod", &tc.container, "fakepodname", "", []string{""}, tc.podVolumes, fhu, fsp, nil, tc.supportsRRO) mounts, _, err := makeMounts(&pod, "/pod", &tc.container, "fakepodname", "", []string{""}, tc.podVolumes, fhu, fsp, nil, tc.supportsRRO, nil)
// validate only the error if we expect an error // validate only the error if we expect an error
if tc.expectErr { if tc.expectErr {

View File

@ -92,7 +92,7 @@ func TestMakeMountsWindows(t *testing.T) {
podDir, err := os.MkdirTemp("", "test-rotate-logs") podDir, err := os.MkdirTemp("", "test-rotate-logs")
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(podDir) defer os.RemoveAll(podDir)
mounts, _, err := makeMounts(&pod, podDir, &container, "fakepodname", "", []string{""}, podVolumes, fhu, fsp, nil, false) mounts, _, err := makeMounts(&pod, podDir, &container, "fakepodname", "", []string{""}, podVolumes, fhu, fsp, nil, false, nil)
require.NoError(t, err) require.NoError(t, err)
expectedMounts := []kubecontainer.Mount{ expectedMounts := []kubecontainer.Mount{

View File

@ -169,22 +169,10 @@ func calcRestartCountByLogDir(path string) (int, error) {
return restartCount, nil return restartCount, nil
} }
// startContainer starts a container and returns a message indicates why it is failed on error. func (m *kubeGenericRuntimeManager) getPodRuntimeHandler(pod *v1.Pod) (podRuntimeHandler string, err error) {
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
container := spec.container
// Step 1: pull the image.
// If RuntimeClassInImageCriAPI feature gate is enabled, pass runtimehandler // If RuntimeClassInImageCriAPI feature gate is enabled, pass runtimehandler
// information for the runtime class specified. If not runtime class is // information for the runtime class specified. If not runtime class is
// specified, then pass "" // specified, then pass ""
podRuntimeHandler := ""
var err error
if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClassInImageCriAPI) { if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClassInImageCriAPI) {
if pod.Spec.RuntimeClassName != nil && *pod.Spec.RuntimeClassName != "" { if pod.Spec.RuntimeClassName != nil && *pod.Spec.RuntimeClassName != "" {
podRuntimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName) podRuntimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
@ -195,7 +183,30 @@ func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandb
} }
} }
imageRef, msg, err := m.imagePuller.EnsureImageExists(ctx, pod, container, pullSecrets, podSandboxConfig, podRuntimeHandler) return podRuntimeHandler, nil
}
// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string, imageVolumes kubecontainer.ImageVolumes) (string, error) {
container := spec.container
// Step 1: pull the image.
podRuntimeHandler, err := m.getPodRuntimeHandler(pod)
if err != nil {
return "", err
}
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod), "containerName", container.Name)
}
imageRef, msg, err := m.imagePuller.EnsureImageExists(ctx, ref, pod, container.Image, pullSecrets, podSandboxConfig, podRuntimeHandler, container.ImagePullPolicy)
if err != nil { if err != nil {
s, _ := grpcstatus.FromError(err) s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message()) m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
@ -234,7 +245,7 @@ func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandb
return s.Message(), ErrCreateContainerConfig return s.Message(), ErrCreateContainerConfig
} }
containerConfig, cleanupAction, err := m.generateContainerConfig(ctx, container, pod, restartCount, podIP, imageRef, podIPs, target) containerConfig, cleanupAction, err := m.generateContainerConfig(ctx, container, pod, restartCount, podIP, imageRef, podIPs, target, imageVolumes)
if cleanupAction != nil { if cleanupAction != nil {
defer cleanupAction() defer cleanupAction()
} }
@ -317,8 +328,8 @@ func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandb
} }
// generateContainerConfig generates container config for kubelet runtime v1. // generateContainerConfig generates container config for kubelet runtime v1.
func (m *kubeGenericRuntimeManager) generateContainerConfig(ctx context.Context, container *v1.Container, pod *v1.Pod, restartCount int, podIP, imageRef string, podIPs []string, nsTarget *kubecontainer.ContainerID) (*runtimeapi.ContainerConfig, func(), error) { func (m *kubeGenericRuntimeManager) generateContainerConfig(ctx context.Context, container *v1.Container, pod *v1.Pod, restartCount int, podIP, imageRef string, podIPs []string, nsTarget *kubecontainer.ContainerID, imageVolumes kubecontainer.ImageVolumes) (*runtimeapi.ContainerConfig, func(), error) {
opts, cleanupAction, err := m.runtimeHelper.GenerateRunContainerOptions(ctx, pod, container, podIP, podIPs) opts, cleanupAction, err := m.runtimeHelper.GenerateRunContainerOptions(ctx, pod, container, podIP, podIPs, imageVolumes)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -436,6 +447,7 @@ func (m *kubeGenericRuntimeManager) makeMounts(opts *kubecontainer.RunContainerO
SelinuxRelabel: selinuxRelabel, SelinuxRelabel: selinuxRelabel,
Propagation: v.Propagation, Propagation: v.Propagation,
RecursiveReadOnly: v.RecursiveReadOnly, RecursiveReadOnly: v.RecursiveReadOnly,
Image: v.Image,
} }
volumeMounts = append(volumeMounts, mount) volumeMounts = append(volumeMounts, mount)
@ -651,6 +663,18 @@ func toKubeContainerStatus(status *runtimeapi.ContainerStatus, runtimeName strin
cStatus.ExitCode = int(status.ExitCode) cStatus.ExitCode = int(status.ExitCode)
cStatus.FinishedAt = time.Unix(0, status.FinishedAt) cStatus.FinishedAt = time.Unix(0, status.FinishedAt)
} }
for _, mount := range status.Mounts {
cStatus.Mounts = append(cStatus.Mounts, kubecontainer.Mount{
HostPath: mount.HostPath,
ContainerPath: mount.ContainerPath,
ReadOnly: mount.Readonly,
RecursiveReadOnly: mount.RecursiveReadOnly,
SELinuxRelabel: mount.SelinuxRelabel,
Propagation: mount.Propagation,
Image: mount.Image,
})
}
return cStatus return cStatus
} }

View File

@ -51,7 +51,7 @@ func makeExpectedConfig(m *kubeGenericRuntimeManager, pod *v1.Pod, containerInde
container := &pod.Spec.Containers[containerIndex] container := &pod.Spec.Containers[containerIndex]
podIP := "" podIP := ""
restartCount := 0 restartCount := 0
opts, _, _ := m.runtimeHelper.GenerateRunContainerOptions(ctx, pod, container, podIP, []string{podIP}) opts, _, _ := m.runtimeHelper.GenerateRunContainerOptions(ctx, pod, container, podIP, []string{podIP}, nil)
containerLogsPath := buildContainerLogsPath(container.Name, restartCount) containerLogsPath := buildContainerLogsPath(container.Name, restartCount)
restartCountUint32 := uint32(restartCount) restartCountUint32 := uint32(restartCount)
envs := make([]*runtimeapi.KeyValue, len(opts.Envs)) envs := make([]*runtimeapi.KeyValue, len(opts.Envs))
@ -113,7 +113,7 @@ func TestGenerateContainerConfig(t *testing.T) {
} }
expectedConfig := makeExpectedConfig(m, pod, 0, false) expectedConfig := makeExpectedConfig(m, pod, 0, false)
containerConfig, _, err := m.generateContainerConfig(ctx, &pod.Spec.Containers[0], pod, 0, "", pod.Spec.Containers[0].Image, []string{}, nil) containerConfig, _, err := m.generateContainerConfig(ctx, &pod.Spec.Containers[0], pod, 0, "", pod.Spec.Containers[0].Image, []string{}, nil, nil)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, expectedConfig, containerConfig, "generate container config for kubelet runtime v1.") assert.Equal(t, expectedConfig, containerConfig, "generate container config for kubelet runtime v1.")
assert.Equal(t, runAsUser, containerConfig.GetLinux().GetSecurityContext().GetRunAsUser().GetValue(), "RunAsUser should be set") assert.Equal(t, runAsUser, containerConfig.GetLinux().GetSecurityContext().GetRunAsUser().GetValue(), "RunAsUser should be set")
@ -144,7 +144,7 @@ func TestGenerateContainerConfig(t *testing.T) {
}, },
} }
_, _, err = m.generateContainerConfig(ctx, &podWithContainerSecurityContext.Spec.Containers[0], podWithContainerSecurityContext, 0, "", podWithContainerSecurityContext.Spec.Containers[0].Image, []string{}, nil) _, _, err = m.generateContainerConfig(ctx, &podWithContainerSecurityContext.Spec.Containers[0], podWithContainerSecurityContext, 0, "", podWithContainerSecurityContext.Spec.Containers[0].Image, []string{}, nil, nil)
assert.Error(t, err) assert.Error(t, err)
imageID, _ := imageService.PullImage(ctx, &runtimeapi.ImageSpec{Image: "busybox"}, nil, nil) imageID, _ := imageService.PullImage(ctx, &runtimeapi.ImageSpec{Image: "busybox"}, nil, nil)
@ -156,7 +156,7 @@ func TestGenerateContainerConfig(t *testing.T) {
podWithContainerSecurityContext.Spec.Containers[0].SecurityContext.RunAsUser = nil podWithContainerSecurityContext.Spec.Containers[0].SecurityContext.RunAsUser = nil
podWithContainerSecurityContext.Spec.Containers[0].SecurityContext.RunAsNonRoot = &runAsNonRootTrue podWithContainerSecurityContext.Spec.Containers[0].SecurityContext.RunAsNonRoot = &runAsNonRootTrue
_, _, err = m.generateContainerConfig(ctx, &podWithContainerSecurityContext.Spec.Containers[0], podWithContainerSecurityContext, 0, "", podWithContainerSecurityContext.Spec.Containers[0].Image, []string{}, nil) _, _, err = m.generateContainerConfig(ctx, &podWithContainerSecurityContext.Spec.Containers[0], podWithContainerSecurityContext, 0, "", podWithContainerSecurityContext.Spec.Containers[0].Image, []string{}, nil, nil)
assert.Error(t, err, "RunAsNonRoot should fail for non-numeric username") assert.Error(t, err, "RunAsNonRoot should fail for non-numeric username")
} }

View File

@ -570,7 +570,7 @@ func testLifeCycleHook(t *testing.T, testPod *v1.Pod, testContainer *v1.Containe
} }
// Now try to create a container, which should in turn invoke PostStart Hook // Now try to create a container, which should in turn invoke PostStart Hook
_, err := m.startContainer(ctx, fakeSandBox.Id, fakeSandBoxConfig, containerStartSpec(testContainer), testPod, fakePodStatus, nil, "", []string{}) _, err := m.startContainer(ctx, fakeSandBox.Id, fakeSandBoxConfig, containerStartSpec(testContainer), testPod, fakePodStatus, nil, "", []string{}, nil)
if err != nil { if err != nil {
t.Errorf("startContainer error =%v", err) t.Errorf("startContainer error =%v", err)
} }

View File

@ -28,6 +28,7 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
grpcstatus "google.golang.org/grpc/status"
crierror "k8s.io/cri-api/pkg/errors" crierror "k8s.io/cri-api/pkg/errors"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -1218,6 +1219,13 @@ func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, po
return return
} }
imageVolumePullResults, err := m.getImageVolumes(ctx, pod, podSandboxConfig, pullSecrets)
if err != nil {
klog.ErrorS(err, "Get image volumes for pod failed", "pod", klog.KObj(pod))
configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, err.Error())
return
}
// Helper containing boilerplate common to starting all types of containers. // Helper containing boilerplate common to starting all types of containers.
// typeName is a description used to describe this type of container in log messages, // typeName is a description used to describe this type of container in log messages,
// currently: "container", "init container" or "ephemeral container" // currently: "container", "init container" or "ephemeral container"
@ -1239,8 +1247,15 @@ func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, po
metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc() metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc()
} }
klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod)) klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
// We fail late here to populate the "ErrImagePull" and "ImagePullBackOff" correctly to the end user.
imageVolumes, err := m.toKubeContainerImageVolumes(imageVolumePullResults, spec.container, pod, startContainerResult)
if err != nil {
return err
}
// NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs. // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
if msg, err := m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil { if msg, err := m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs, imageVolumes); err != nil {
// startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are // startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
// useful to cluster administrators to distinguish "server errors" from "user errors". // useful to cluster administrators to distinguish "server errors" from "user errors".
metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc() metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
@ -1315,6 +1330,92 @@ func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, po
return return
} }
// imageVolumePulls are the pull results for each image volume name.
type imageVolumePulls = map[string]imageVolumePullResult
// imageVolumePullResult is a pull result for a single image volume.
// If spec is nil, then err and msg should be set.
// If err is nil, then spec should be set.
type imageVolumePullResult struct {
spec runtimeapi.ImageSpec
err error
msg string
}
func (m *kubeGenericRuntimeManager) toKubeContainerImageVolumes(imageVolumePullResults imageVolumePulls, container *v1.Container, pod *v1.Pod, syncResult *kubecontainer.SyncResult) (kubecontainer.ImageVolumes, error) {
if len(imageVolumePullResults) == 0 {
return nil, nil
}
imageVolumes := kubecontainer.ImageVolumes{}
var (
lastErr error
lastMsg string
)
for _, v := range container.VolumeMounts {
res, ok := imageVolumePullResults[v.Name]
if !ok {
continue
}
if res.err != nil {
s, _ := grpcstatus.FromError(res.err)
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
lastErr = res.err
lastMsg = res.msg
continue
}
imageVolumes[v.Name] = &res.spec
}
if lastErr != nil {
syncResult.Fail(lastErr, lastMsg)
return nil, lastErr
}
return imageVolumes, nil
}
func (m *kubeGenericRuntimeManager) getImageVolumes(ctx context.Context, pod *v1.Pod, podSandboxConfig *runtimeapi.PodSandboxConfig, pullSecrets []v1.Secret) (imageVolumePulls, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.ImageVolume) {
return nil, nil
}
podRuntimeHandler, err := m.getPodRuntimeHandler(pod)
if err != nil {
klog.ErrorS(err, "Failed to get pod runtime handler", "pod", klog.KObj(pod))
return nil, err
}
res := make(imageVolumePulls)
for _, volume := range pod.Spec.Volumes {
if volume.Image == nil {
continue
}
objectRef, _ := ref.GetReference(legacyscheme.Scheme, pod) // objectRef can be nil, no error check required
ref, msg, err := m.imagePuller.EnsureImageExists(
ctx, objectRef, pod, volume.Image.Reference, pullSecrets, podSandboxConfig, podRuntimeHandler, volume.Image.PullPolicy,
)
if err != nil {
klog.ErrorS(err, "Failed to ensure image", "pod", klog.KObj(pod))
res[volume.Name] = imageVolumePullResult{err: err, msg: msg}
continue
}
klog.V(4).InfoS("Pulled image", "ref", ref, "pod", klog.KObj(pod))
res[volume.Name] = imageVolumePullResult{spec: runtimeapi.ImageSpec{
Image: ref,
UserSpecifiedImage: volume.Image.Reference,
RuntimeHandler: podRuntimeHandler,
Annotations: pod.Annotations,
}}
}
return res, nil
}
// If a container is still in backoff, the function will return a brief backoff error and // If a container is still in backoff, the function will return a brief backoff error and
// a detailed error message. // a detailed error message.
func (m *kubeGenericRuntimeManager) doBackOff(pod *v1.Pod, container *v1.Container, podStatus *kubecontainer.PodStatus, backOff *flowcontrol.Backoff) (bool, string, error) { func (m *kubeGenericRuntimeManager) doBackOff(pod *v1.Pod, container *v1.Container, podStatus *kubecontainer.PodStatus, backOff *flowcontrol.Backoff) (bool, string, error) {
@ -1511,6 +1612,14 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(ctx context.Context, uid kubety
}, nil }, nil
} }
func (m *kubeGenericRuntimeManager) GetContainerStatus(ctx context.Context, id kubecontainer.ContainerID) (*kubecontainer.Status, error) {
resp, err := m.runtimeService.ContainerStatus(ctx, id.ID, false)
if err != nil {
return nil, fmt.Errorf("runtime container status: %w", err)
}
return m.convertToKubeContainerStatus(resp.GetStatus()), nil
}
// GarbageCollect removes dead containers using the specified container gc policy. // GarbageCollect removes dead containers using the specified container gc policy.
func (m *kubeGenericRuntimeManager) GarbageCollect(ctx context.Context, gcPolicy kubecontainer.GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error { func (m *kubeGenericRuntimeManager) GarbageCollect(ctx context.Context, gcPolicy kubecontainer.GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error {
return m.containerGC.GarbageCollect(ctx, gcPolicy, allSourcesReady, evictNonDeletedPods) return m.containerGC.GarbageCollect(ctx, gcPolicy, allSourcesReady, evictNonDeletedPods)

View File

@ -18,6 +18,7 @@ package kuberuntime
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"path/filepath" "path/filepath"
"reflect" "reflect"
@ -48,6 +49,7 @@ import (
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
imagetypes "k8s.io/kubernetes/pkg/kubelet/images"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
) )
@ -171,7 +173,7 @@ func makeFakeContainer(t *testing.T, m *kubeGenericRuntimeManager, template cont
sandboxConfig, err := m.generatePodSandboxConfig(template.pod, template.sandboxAttempt) sandboxConfig, err := m.generatePodSandboxConfig(template.pod, template.sandboxAttempt)
assert.NoError(t, err, "generatePodSandboxConfig for container template %+v", template) assert.NoError(t, err, "generatePodSandboxConfig for container template %+v", template)
containerConfig, _, err := m.generateContainerConfig(ctx, template.container, template.pod, template.attempt, "", template.container.Image, []string{}, nil) containerConfig, _, err := m.generateContainerConfig(ctx, template.container, template.pod, template.attempt, "", template.container.Image, []string{}, nil, nil)
assert.NoError(t, err, "generateContainerConfig for container template %+v", template) assert.NoError(t, err, "generateContainerConfig for container template %+v", template)
podSandboxID := apitest.BuildSandboxName(sandboxConfig.Metadata) podSandboxID := apitest.BuildSandboxName(sandboxConfig.Metadata)
@ -2578,3 +2580,131 @@ func TestUpdatePodContainerResources(t *testing.T) {
} }
} }
} }
func TestToKubeContainerImageVolumes(t *testing.T) {
_, _, manager, err := createTestRuntimeManager()
require.NoError(t, err)
const (
volume1 = "volume-1"
volume2 = "volume-2"
)
imageSpec1 := runtimeapi.ImageSpec{Image: "image-1"}
imageSpec2 := runtimeapi.ImageSpec{Image: "image-2"}
errTest := errors.New("pull failed")
syncResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, "test")
for desc, tc := range map[string]struct {
pullResults imageVolumePulls
container *v1.Container
expectedError error
expectedImageVolumes kubecontainer.ImageVolumes
}{
"empty volumes": {},
"multiple volumes": {
pullResults: imageVolumePulls{
volume1: imageVolumePullResult{spec: imageSpec1},
volume2: imageVolumePullResult{spec: imageSpec2},
},
container: &v1.Container{
VolumeMounts: []v1.VolumeMount{
{Name: volume1},
{Name: volume2},
},
},
expectedImageVolumes: kubecontainer.ImageVolumes{
volume1: &imageSpec1,
volume2: &imageSpec2,
},
},
"not matching volume": {
pullResults: imageVolumePulls{
"different": imageVolumePullResult{spec: imageSpec1},
},
container: &v1.Container{
VolumeMounts: []v1.VolumeMount{{Name: volume1}},
},
expectedImageVolumes: kubecontainer.ImageVolumes{},
},
"error in pull result": {
pullResults: imageVolumePulls{
volume1: imageVolumePullResult{err: errTest},
},
container: &v1.Container{
VolumeMounts: []v1.VolumeMount{
{Name: volume1},
},
},
expectedError: errTest,
},
} {
imageVolumes, err := manager.toKubeContainerImageVolumes(tc.pullResults, tc.container, &v1.Pod{}, syncResult)
if tc.expectedError != nil {
require.EqualError(t, err, tc.expectedError.Error())
} else {
require.NoError(t, err, desc)
}
assert.Equal(t, tc.expectedImageVolumes, imageVolumes)
}
}
func TestGetImageVolumes(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ImageVolume, true)
_, _, manager, err := createTestRuntimeManager()
require.NoError(t, err)
const (
volume1 = "volume-1"
volume2 = "volume-2"
image1 = "image-1:latest"
image2 = "image-2:latest"
)
imageSpec1 := runtimeapi.ImageSpec{Image: image1, UserSpecifiedImage: image1}
imageSpec2 := runtimeapi.ImageSpec{Image: image2, UserSpecifiedImage: image2}
for desc, tc := range map[string]struct {
pod *v1.Pod
expectedImageVolumePulls imageVolumePulls
expectedError error
}{
"empty volumes": {
pod: &v1.Pod{Spec: v1.PodSpec{Volumes: []v1.Volume{}}},
expectedImageVolumePulls: imageVolumePulls{},
},
"multiple volumes": {
pod: &v1.Pod{Spec: v1.PodSpec{Volumes: []v1.Volume{
{Name: volume1, VolumeSource: v1.VolumeSource{Image: &v1.ImageVolumeSource{Reference: image1, PullPolicy: v1.PullAlways}}},
{Name: volume2, VolumeSource: v1.VolumeSource{Image: &v1.ImageVolumeSource{Reference: image2, PullPolicy: v1.PullAlways}}},
}}},
expectedImageVolumePulls: imageVolumePulls{
volume1: imageVolumePullResult{spec: imageSpec1},
volume2: imageVolumePullResult{spec: imageSpec2},
},
},
"different than image volumes": {
pod: &v1.Pod{Spec: v1.PodSpec{Volumes: []v1.Volume{
{Name: volume1, VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{}}},
}}},
expectedImageVolumePulls: imageVolumePulls{},
},
"multiple volumes but one failed to pull": {
pod: &v1.Pod{Spec: v1.PodSpec{Volumes: []v1.Volume{
{Name: volume1, VolumeSource: v1.VolumeSource{Image: &v1.ImageVolumeSource{Reference: image1, PullPolicy: v1.PullAlways}}},
{Name: volume2, VolumeSource: v1.VolumeSource{Image: &v1.ImageVolumeSource{Reference: "image", PullPolicy: v1.PullNever}}}, // fails
}}},
expectedImageVolumePulls: imageVolumePulls{
volume1: imageVolumePullResult{spec: imageSpec1},
volume2: imageVolumePullResult{err: imagetypes.ErrImageNeverPull, msg: `Container image "image" is not present with pull policy of Never`},
},
},
} {
imageVolumePulls, err := manager.getImageVolumes(context.TODO(), tc.pod, nil, nil)
if tc.expectedError != nil {
require.EqualError(t, err, tc.expectedError.Error())
} else {
require.NoError(t, err, desc)
}
assert.Equal(t, tc.expectedImageVolumePulls, imageVolumePulls)
}
}

9
pkg/volume/image/OWNERS Normal file
View File

@ -0,0 +1,9 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- sig-node-approvers
reviewers:
- sig-node-reviewers
labels:
- sig/node
- area/kubelet

83
pkg/volume/image/image.go Normal file
View File

@ -0,0 +1,83 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package image
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/volume"
)
// imagePlugin is the image volume plugin which acts as a stub to provide the
// functionality the volume manager expects. The real volume source
// implementation is part of the kubelet code and gated by the Kubernetes
// feature "ImageVolume"
// See: https://kep.k8s.io/4639
type imagePlugin struct {
spec *volume.Spec
volume.MetricsNil
}
var _ volume.VolumePlugin = &imagePlugin{}
var _ volume.Mounter = &imagePlugin{}
var _ volume.Unmounter = &imagePlugin{}
var _ volume.Volume = &imagePlugin{}
const pluginName = "kubernetes.io/image"
func ProbeVolumePlugins() []volume.VolumePlugin {
p := &imagePlugin{}
return []volume.VolumePlugin{p}
}
func (o *imagePlugin) Init(volume.VolumeHost) error { return nil }
func (o *imagePlugin) GetPluginName() string { return pluginName }
func (o *imagePlugin) GetVolumeName(spec *volume.Spec) (string, error) { return o.spec.Name(), nil }
func (o *imagePlugin) CanSupport(spec *volume.Spec) bool {
return spec.Volume.Image != nil
}
func (o *imagePlugin) NewMounter(spec *volume.Spec, podRef *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
return o, nil
}
func (o *imagePlugin) NewUnmounter(name string, podUID types.UID) (volume.Unmounter, error) {
return o, nil
}
func (o *imagePlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
return volume.ReconstructedVolume{Spec: o.spec}, nil
}
func (o *imagePlugin) GetAttributes() volume.Attributes {
return volume.Attributes{
ReadOnly: true,
Managed: true,
SELinuxRelabel: true,
}
}
func (o *imagePlugin) GetPath() string { return "" }
func (o *imagePlugin) RequiresFSResize() bool { return false }
func (o *imagePlugin) RequiresRemount(spec *volume.Spec) bool { return false }
func (o *imagePlugin) SetUp(mounterArgs volume.MounterArgs) error { return nil }
func (o *imagePlugin) SetUpAt(dir string, mounterArgs volume.MounterArgs) error { return nil }
func (o *imagePlugin) SupportsMountOption() bool { return false }
func (o *imagePlugin) SupportsSELinuxContextMount(spec *volume.Spec) (bool, error) { return false, nil }
func (o *imagePlugin) TearDown() error { return nil }
func (o *imagePlugin) TearDownAt(string) error { return nil }