mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 10:19:50 +00:00
Merge pull request #33301 from yujuhong/docker_hacks
Automatic merge from submit-queue Fake container exec/logs support for in-process docker CRI integration This is necessary to unblock other work on docker integration, while we are addressing `logs` and `exec` in the meantime. This is part of #31459 and #33189 /cc @kubernetes/sig-node
This commit is contained in:
commit
509096af21
@ -20,16 +20,17 @@ import (
|
|||||||
"hash/adler32"
|
"hash/adler32"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
|
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
hashutil "k8s.io/kubernetes/pkg/util/hash"
|
hashutil "k8s.io/kubernetes/pkg/util/hash"
|
||||||
"k8s.io/kubernetes/third_party/forked/golang/expansion"
|
"k8s.io/kubernetes/third_party/forked/golang/expansion"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// HandlerRunner runs a lifecycle handler for a container.
|
// HandlerRunner runs a lifecycle handler for a container.
|
||||||
@ -166,3 +167,50 @@ func (irecorder *innerEventRecorder) PastEventf(object runtime.Object, timestamp
|
|||||||
func IsHostNetworkPod(pod *api.Pod) bool {
|
func IsHostNetworkPod(pod *api.Pod) bool {
|
||||||
return pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork
|
return pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(random-liu): Convert PodStatus to running Pod, should be deprecated soon
|
||||||
|
func ConvertPodStatusToRunningPod(runtimeName string, podStatus *PodStatus) Pod {
|
||||||
|
runningPod := Pod{
|
||||||
|
ID: podStatus.ID,
|
||||||
|
Name: podStatus.Name,
|
||||||
|
Namespace: podStatus.Namespace,
|
||||||
|
}
|
||||||
|
for _, containerStatus := range podStatus.ContainerStatuses {
|
||||||
|
if containerStatus.State != ContainerStateRunning {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
container := &Container{
|
||||||
|
ID: containerStatus.ID,
|
||||||
|
Name: containerStatus.Name,
|
||||||
|
Image: containerStatus.Image,
|
||||||
|
ImageID: containerStatus.ImageID,
|
||||||
|
Hash: containerStatus.Hash,
|
||||||
|
State: containerStatus.State,
|
||||||
|
}
|
||||||
|
runningPod.Containers = append(runningPod.Containers, container)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populate sandboxes in kubecontainer.Pod
|
||||||
|
for _, sandbox := range podStatus.SandboxStatuses {
|
||||||
|
runningPod.Sandboxes = append(runningPod.Sandboxes, &Container{
|
||||||
|
ID: ContainerID{Type: runtimeName, ID: *sandbox.Id},
|
||||||
|
State: SandboxToContainerState(*sandbox.State),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return runningPod
|
||||||
|
}
|
||||||
|
|
||||||
|
// sandboxToContainerState converts runtimeApi.PodSandboxState to
|
||||||
|
// kubecontainer.ContainerState.
|
||||||
|
// This is only needed because we need to return sandboxes as if they were
|
||||||
|
// kubecontainer.Containers to avoid substantial changes to PLEG.
|
||||||
|
// TODO: Remove this once it becomes obsolete.
|
||||||
|
func SandboxToContainerState(state runtimeApi.PodSandBoxState) ContainerState {
|
||||||
|
switch state {
|
||||||
|
case runtimeApi.PodSandBoxState_READY:
|
||||||
|
return ContainerStateRunning
|
||||||
|
case runtimeApi.PodSandBoxState_NOTREADY:
|
||||||
|
return ContainerStateExited
|
||||||
|
}
|
||||||
|
return ContainerStateUnknown
|
||||||
|
}
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/dockertools"
|
"k8s.io/kubernetes/pkg/kubelet/dockertools"
|
||||||
|
"k8s.io/kubernetes/pkg/util/term"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -69,6 +70,9 @@ type DockerLegacyService interface {
|
|||||||
GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error)
|
GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error)
|
||||||
kubecontainer.ContainerAttacher
|
kubecontainer.ContainerAttacher
|
||||||
PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error
|
PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error
|
||||||
|
|
||||||
|
// TODO: Remove this once exec is properly defined in CRI.
|
||||||
|
ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type dockerService struct {
|
type dockerService struct {
|
||||||
|
@ -42,3 +42,16 @@ func (ds *dockerService) GetContainerLogs(pod *api.Pod, containerID kubecontaine
|
|||||||
func (ds *dockerService) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
|
func (ds *dockerService) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
|
||||||
return fmt.Errorf("not implemented")
|
return fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ds *dockerService) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
|
||||||
|
container, err := ds.client.InspectContainer(containerID.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !container.State.Running {
|
||||||
|
return fmt.Errorf("container not running (%s)", container.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
handler := &dockertools.NativeExecHandler{}
|
||||||
|
return handler.ExecInContainer(ds.client, container, cmd, stdin, stdout, stderr, tty, resize)
|
||||||
|
}
|
||||||
|
@ -48,7 +48,6 @@ import (
|
|||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/events"
|
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/images"
|
"k8s.io/kubernetes/pkg/kubelet/images"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||||
@ -2052,7 +2051,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubec
|
|||||||
|
|
||||||
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
|
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
|
||||||
// TODO(random-liu): We'll use pod status directly in the future
|
// TODO(random-liu): We'll use pod status directly in the future
|
||||||
killResult := dm.killPodWithSyncResult(pod, kuberuntime.ConvertPodStatusToRunningPod(dm.Type(), podStatus), nil)
|
killResult := dm.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(dm.Type(), podStatus), nil)
|
||||||
result.AddPodSyncResult(killResult)
|
result.AddPodSyncResult(killResult)
|
||||||
if killResult.Error() != nil {
|
if killResult.Error() != nil {
|
||||||
return
|
return
|
||||||
|
@ -1709,7 +1709,7 @@ func (kl *Kubelet) killPod(pod *api.Pod, runningPod *kubecontainer.Pod, status *
|
|||||||
if runningPod != nil {
|
if runningPod != nil {
|
||||||
p = *runningPod
|
p = *runningPod
|
||||||
} else if status != nil {
|
} else if status != nil {
|
||||||
p = kuberuntime.ConvertPodStatusToRunningPod(kl.GetRuntime().Type(), status)
|
p = kubecontainer.ConvertPodStatusToRunningPod(kl.GetRuntime().Type(), status)
|
||||||
}
|
}
|
||||||
return kl.containerRuntime.KillPod(pod, p, gracePeriodOverride)
|
return kl.containerRuntime.KillPod(pod, p, gracePeriodOverride)
|
||||||
}
|
}
|
||||||
|
@ -77,54 +77,6 @@ func toKubeContainerState(state runtimeApi.ContainerState) kubecontainer.Contain
|
|||||||
return kubecontainer.ContainerStateUnknown
|
return kubecontainer.ContainerStateUnknown
|
||||||
}
|
}
|
||||||
|
|
||||||
// sandboxToKubeContainerState converts runtimeApi.PodSandboxState to
|
|
||||||
// kubecontainer.ContainerState.
|
|
||||||
// This is only needed because we need to return sandboxes as if they were
|
|
||||||
// kubecontainer.Containers to avoid substantial changes to PLEG.
|
|
||||||
// TODO: Remove this once it becomes obsolete.
|
|
||||||
func sandboxToKubeContainerState(state runtimeApi.PodSandBoxState) kubecontainer.ContainerState {
|
|
||||||
switch state {
|
|
||||||
case runtimeApi.PodSandBoxState_READY:
|
|
||||||
return kubecontainer.ContainerStateRunning
|
|
||||||
case runtimeApi.PodSandBoxState_NOTREADY:
|
|
||||||
return kubecontainer.ContainerStateExited
|
|
||||||
}
|
|
||||||
return kubecontainer.ContainerStateUnknown
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(random-liu): Convert PodStatus to running Pod, should be deprecated soon
|
|
||||||
func ConvertPodStatusToRunningPod(runtimeName string, podStatus *kubecontainer.PodStatus) kubecontainer.Pod {
|
|
||||||
runningPod := kubecontainer.Pod{
|
|
||||||
ID: podStatus.ID,
|
|
||||||
Name: podStatus.Name,
|
|
||||||
Namespace: podStatus.Namespace,
|
|
||||||
}
|
|
||||||
for _, containerStatus := range podStatus.ContainerStatuses {
|
|
||||||
if containerStatus.State != kubecontainer.ContainerStateRunning {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
container := &kubecontainer.Container{
|
|
||||||
ID: containerStatus.ID,
|
|
||||||
Name: containerStatus.Name,
|
|
||||||
Image: containerStatus.Image,
|
|
||||||
ImageID: containerStatus.ImageID,
|
|
||||||
Hash: containerStatus.Hash,
|
|
||||||
State: containerStatus.State,
|
|
||||||
}
|
|
||||||
runningPod.Containers = append(runningPod.Containers, container)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Need to place a sandbox in the Pod as well.
|
|
||||||
for _, sandbox := range podStatus.SandboxStatuses {
|
|
||||||
runningPod.Sandboxes = append(runningPod.Sandboxes, &kubecontainer.Container{
|
|
||||||
ID: kubecontainer.ContainerID{Type: runtimeName, ID: *sandbox.Id},
|
|
||||||
State: sandboxToKubeContainerState(*sandbox.State),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return runningPod
|
|
||||||
}
|
|
||||||
|
|
||||||
// toRuntimeProtocol converts api.Protocol to runtimeApi.Protocol.
|
// toRuntimeProtocol converts api.Protocol to runtimeApi.Protocol.
|
||||||
func toRuntimeProtocol(protocol api.Protocol) runtimeApi.Protocol {
|
func toRuntimeProtocol(protocol api.Protocol) runtimeApi.Protocol {
|
||||||
switch protocol {
|
switch protocol {
|
||||||
@ -166,7 +118,7 @@ func (m *kubeGenericRuntimeManager) sandboxToKubeContainer(s *runtimeApi.PodSand
|
|||||||
|
|
||||||
return &kubecontainer.Container{
|
return &kubecontainer.Container{
|
||||||
ID: kubecontainer.ContainerID{Type: m.runtimeName, ID: s.GetId()},
|
ID: kubecontainer.ContainerID{Type: m.runtimeName, ID: s.GetId()},
|
||||||
State: sandboxToKubeContainerState(s.GetState()),
|
State: kubecontainer.SandboxToContainerState(s.GetState()),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,6 +32,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/dockershim"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/events"
|
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
@ -513,6 +514,13 @@ func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID
|
|||||||
|
|
||||||
// GetContainerLogs returns logs of a specific container.
|
// GetContainerLogs returns logs of a specific container.
|
||||||
func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
|
func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
|
||||||
|
// Get logs directly from docker for in-process docker integration for
|
||||||
|
// now to unblock other tests.
|
||||||
|
// TODO: remove this hack after setting down on how to implement log
|
||||||
|
// retrieval/management.
|
||||||
|
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
|
||||||
|
return ds.GetContainerLogs(pod, containerID, logOptions, stdout, stderr)
|
||||||
|
}
|
||||||
return fmt.Errorf("not implemented")
|
return fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -521,6 +529,12 @@ func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *api.Pod, containerID k
|
|||||||
// tty.
|
// tty.
|
||||||
// TODO: handle terminal resizing, refer https://github.com/kubernetes/kubernetes/issues/29579
|
// TODO: handle terminal resizing, refer https://github.com/kubernetes/kubernetes/issues/29579
|
||||||
func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
|
func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
|
||||||
|
// Use `docker exec` directly for in-process docker integration for
|
||||||
|
// now to unblock other tests.
|
||||||
|
// TODO: remove this hack after exec is defined in CRI.
|
||||||
|
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
|
||||||
|
return ds.ExecInContainer(containerID, cmd, stdin, stdout, stderr, tty, resize)
|
||||||
|
}
|
||||||
return fmt.Errorf("not implemented")
|
return fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -487,7 +487,7 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *api.Pod, _ api.PodStatus, podSt
|
|||||||
glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
|
glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
|
||||||
}
|
}
|
||||||
|
|
||||||
killResult := m.killPodWithSyncResult(pod, ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
|
killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
|
||||||
result.AddPodSyncResult(killResult)
|
result.AddPodSyncResult(killResult)
|
||||||
if killResult.Error() != nil {
|
if killResult.Error() != nil {
|
||||||
glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
|
glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
|
||||||
|
@ -47,7 +47,6 @@ import (
|
|||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/events"
|
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/images"
|
"k8s.io/kubernetes/pkg/kubelet/images"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/leaky"
|
"k8s.io/kubernetes/pkg/kubelet/leaky"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||||
@ -1713,7 +1712,7 @@ func (r *Runtime) SyncPod(pod *api.Pod, podStatus api.PodStatus, internalPodStat
|
|||||||
// TODO: (random-liu) Stop using running pod in SyncPod()
|
// TODO: (random-liu) Stop using running pod in SyncPod()
|
||||||
// TODO: (random-liu) Rename podStatus to apiPodStatus, rename internalPodStatus to podStatus, and use new pod status as much as possible,
|
// TODO: (random-liu) Rename podStatus to apiPodStatus, rename internalPodStatus to podStatus, and use new pod status as much as possible,
|
||||||
// we may stop using apiPodStatus someday.
|
// we may stop using apiPodStatus someday.
|
||||||
runningPod := kuberuntime.ConvertPodStatusToRunningPod(r.Type(), internalPodStatus)
|
runningPod := kubecontainer.ConvertPodStatusToRunningPod(r.Type(), internalPodStatus)
|
||||||
// Add references to all containers.
|
// Add references to all containers.
|
||||||
unidentifiedContainers := make(map[kubecontainer.ContainerID]*kubecontainer.Container)
|
unidentifiedContainers := make(map[kubecontainer.ContainerID]*kubecontainer.Container)
|
||||||
for _, c := range runningPod.Containers {
|
for _, c := range runningPod.Containers {
|
||||||
|
Loading…
Reference in New Issue
Block a user