mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 18:24:07 +00:00
Remove RunInContainer interface in Kuberlete Runtime interface
This commit is contained in:
parent
ae5065a2c2
commit
831203c19b
@ -112,9 +112,6 @@ type ContainerAttacher interface {
|
|||||||
|
|
||||||
// CommandRunner encapsulates the command runner interfaces for testability.
|
// CommandRunner encapsulates the command runner interfaces for testability.
|
||||||
type ContainerCommandRunner interface {
|
type ContainerCommandRunner interface {
|
||||||
// TODO(vmarmol): Merge RunInContainer and ExecInContainer.
|
|
||||||
// Runs the command in the container of the specified pod.
|
|
||||||
RunInContainer(containerID ContainerID, cmd []string) ([]byte, error)
|
|
||||||
// Runs the command in the container of the specified pod using nsenter.
|
// Runs the command in the container of the specified pod using nsenter.
|
||||||
// Attaches the processes stdin, stdout, and stderr. Optionally uses a
|
// Attaches the processes stdin, stdout, and stderr. Optionally uses a
|
||||||
// tty.
|
// tty.
|
||||||
|
@ -276,14 +276,6 @@ func (f *FakeRuntime) AttachContainer(containerID ContainerID, stdin io.Reader,
|
|||||||
return f.Err
|
return f.Err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FakeRuntime) RunInContainer(containerID ContainerID, cmd []string) ([]byte, error) {
|
|
||||||
f.Lock()
|
|
||||||
defer f.Unlock()
|
|
||||||
|
|
||||||
f.CalledFunctions = append(f.CalledFunctions, "RunInContainer")
|
|
||||||
return []byte{}, f.Err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FakeRuntime) GetContainerLogs(pod *api.Pod, containerID ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
|
func (f *FakeRuntime) GetContainerLogs(pod *api.Pod, containerID ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
|
||||||
f.Lock()
|
f.Lock()
|
||||||
defer f.Unlock()
|
defer f.Unlock()
|
||||||
|
@ -98,11 +98,6 @@ func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout,
|
|||||||
return args.Error(0)
|
return args.Error(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Mock) RunInContainer(containerID ContainerID, cmd []string) ([]byte, error) {
|
|
||||||
args := r.Called(containerID, cmd)
|
|
||||||
return args.Get(0).([]byte), args.Error(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Mock) GetContainerLogs(pod *api.Pod, containerID ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
|
func (r *Mock) GetContainerLogs(pod *api.Pod, containerID ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
|
||||||
args := r.Called(pod, containerID, logOptions, stdout, stderr)
|
args := r.Called(pod, containerID, logOptions, stdout, stderr)
|
||||||
return args.Error(0)
|
return args.Error(0)
|
||||||
|
@ -982,53 +982,6 @@ func (dm *DockerManager) defaultSecurityOpt() ([]string, error) {
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunInContainer run the command inside the container identified by containerID
|
|
||||||
func (dm *DockerManager) RunInContainer(containerID kubecontainer.ContainerID, cmd []string) ([]byte, error) {
|
|
||||||
glog.V(2).Infof("Using docker native exec to run cmd %+v inside container %s", cmd, containerID)
|
|
||||||
createOpts := dockertypes.ExecConfig{
|
|
||||||
Cmd: cmd,
|
|
||||||
AttachStdin: false,
|
|
||||||
AttachStdout: true,
|
|
||||||
AttachStderr: true,
|
|
||||||
Tty: false,
|
|
||||||
}
|
|
||||||
execObj, err := dm.client.CreateExec(containerID.ID, createOpts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to run in container - Exec setup failed - %v", err)
|
|
||||||
}
|
|
||||||
var buf bytes.Buffer
|
|
||||||
startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: false}
|
|
||||||
streamOpts := StreamOptions{
|
|
||||||
OutputStream: &buf,
|
|
||||||
ErrorStream: &buf,
|
|
||||||
RawTerminal: false,
|
|
||||||
}
|
|
||||||
err = dm.client.StartExec(execObj.ID, startOpts, streamOpts)
|
|
||||||
if err != nil {
|
|
||||||
glog.V(2).Infof("StartExec With error: %v", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ticker := time.NewTicker(2 * time.Second)
|
|
||||||
defer ticker.Stop()
|
|
||||||
for {
|
|
||||||
inspect, err2 := dm.client.InspectExec(execObj.ID)
|
|
||||||
if err2 != nil {
|
|
||||||
glog.V(2).Infof("InspectExec %s failed with error: %+v", execObj.ID, err2)
|
|
||||||
return buf.Bytes(), err2
|
|
||||||
}
|
|
||||||
if !inspect.Running {
|
|
||||||
if inspect.ExitCode != 0 {
|
|
||||||
glog.V(2).Infof("InspectExec %s exit with result %+v", execObj.ID, inspect)
|
|
||||||
err = &dockerExitError{inspect}
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
<-ticker.C
|
|
||||||
}
|
|
||||||
|
|
||||||
return buf.Bytes(), err
|
|
||||||
}
|
|
||||||
|
|
||||||
type dockerExitError struct {
|
type dockerExitError struct {
|
||||||
Inspect *dockertypes.ContainerExecInspect
|
Inspect *dockertypes.ContainerExecInspect
|
||||||
}
|
}
|
||||||
|
@ -477,7 +477,7 @@ func TestKillContainerInPodWithPreStop(t *testing.T) {
|
|||||||
if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil {
|
if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil {
|
||||||
t.Errorf("container was not stopped correctly: %v", err)
|
t.Errorf("container was not stopped correctly: %v", err)
|
||||||
}
|
}
|
||||||
verifyCalls(t, fakeDocker, []string{"list", "create_exec", "start_exec", "stop"})
|
verifyCalls(t, fakeDocker, []string{"list", "inspect_container", "create_exec", "start_exec", "stop"})
|
||||||
if !reflect.DeepEqual(expectedCmd, fakeDocker.execCmd) {
|
if !reflect.DeepEqual(expectedCmd, fakeDocker.execCmd) {
|
||||||
t.Errorf("expected: %v, got %v", expectedCmd, fakeDocker.execCmd)
|
t.Errorf("expected: %v, got %v", expectedCmd, fakeDocker.execCmd)
|
||||||
}
|
}
|
||||||
|
@ -64,6 +64,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/queue"
|
"k8s.io/kubernetes/pkg/kubelet/util/queue"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/securitycontext"
|
"k8s.io/kubernetes/pkg/securitycontext"
|
||||||
@ -3456,7 +3457,15 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe
|
|||||||
if container == nil {
|
if container == nil {
|
||||||
return nil, fmt.Errorf("container not found (%q)", containerName)
|
return nil, fmt.Errorf("container not found (%q)", containerName)
|
||||||
}
|
}
|
||||||
return kl.runner.RunInContainer(container.ID, cmd)
|
|
||||||
|
var buffer bytes.Buffer
|
||||||
|
output := ioutils.WriteCloserWrapper(&buffer)
|
||||||
|
err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return buffer.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecInContainer executes a command in a container, connecting the supplied
|
// ExecInContainer executes a command in a container, connecting the supplied
|
||||||
|
@ -1026,12 +1026,6 @@ type fakeContainerCommandRunner struct {
|
|||||||
Stream io.ReadWriteCloser
|
Stream io.ReadWriteCloser
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeContainerCommandRunner) RunInContainer(id kubecontainer.ContainerID, cmd []string) ([]byte, error) {
|
|
||||||
f.Cmd = cmd
|
|
||||||
f.ID = id
|
|
||||||
return []byte{}, f.E
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
|
func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
|
||||||
f.Cmd = cmd
|
f.Cmd = cmd
|
||||||
f.ID = id
|
f.ID = id
|
||||||
|
@ -21,10 +21,12 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"bytes"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/util/intstr"
|
"k8s.io/kubernetes/pkg/util/intstr"
|
||||||
)
|
)
|
||||||
@ -50,7 +52,9 @@ func NewHandlerRunner(httpGetter kubetypes.HttpGetter, commandRunner kubecontain
|
|||||||
func (hr *HandlerRunner) Run(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container, handler *api.Handler) error {
|
func (hr *HandlerRunner) Run(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container, handler *api.Handler) error {
|
||||||
switch {
|
switch {
|
||||||
case handler.Exec != nil:
|
case handler.Exec != nil:
|
||||||
_, err := hr.commandRunner.RunInContainer(containerID, handler.Exec.Command)
|
var buffer bytes.Buffer
|
||||||
|
output := ioutils.WriteCloserWrapper(&buffer)
|
||||||
|
err := hr.commandRunner.ExecInContainer(containerID, handler.Exec.Command, nil, output, output, false)
|
||||||
return err
|
return err
|
||||||
case handler.HTTPGet != nil:
|
case handler.HTTPGet != nil:
|
||||||
return hr.runHTTPHandler(pod, container, handler)
|
return hr.runHTTPHandler(pod, container, handler)
|
||||||
|
@ -77,13 +77,9 @@ type fakeContainerCommandRunner struct {
|
|||||||
ID kubecontainer.ContainerID
|
ID kubecontainer.ContainerID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeContainerCommandRunner) RunInContainer(id kubecontainer.ContainerID, cmd []string) ([]byte, error) {
|
func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
|
||||||
f.Cmd = cmd
|
f.Cmd = cmd
|
||||||
f.ID = id
|
f.ID = id
|
||||||
return []byte{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package prober
|
package prober
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -30,6 +31,7 @@ import (
|
|||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
|
||||||
"k8s.io/kubernetes/pkg/probe"
|
"k8s.io/kubernetes/pkg/probe"
|
||||||
execprobe "k8s.io/kubernetes/pkg/probe/exec"
|
execprobe "k8s.io/kubernetes/pkg/probe/exec"
|
||||||
httprobe "k8s.io/kubernetes/pkg/probe/http"
|
httprobe "k8s.io/kubernetes/pkg/probe/http"
|
||||||
@ -219,7 +221,14 @@ type execInContainer struct {
|
|||||||
|
|
||||||
func (p *prober) newExecInContainer(container api.Container, containerID kubecontainer.ContainerID, cmd []string) exec.Cmd {
|
func (p *prober) newExecInContainer(container api.Container, containerID kubecontainer.ContainerID, cmd []string) exec.Cmd {
|
||||||
return execInContainer{func() ([]byte, error) {
|
return execInContainer{func() ([]byte, error) {
|
||||||
return p.runner.RunInContainer(containerID, cmd)
|
var buffer bytes.Buffer
|
||||||
|
output := ioutils.WriteCloserWrapper(&buffer)
|
||||||
|
err := p.runner.ExecInContainer(containerID, cmd, nil, output, output, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return buffer.Bytes(), nil
|
||||||
}}
|
}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1463,28 +1463,6 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: In rkt, the container ID is in the form of "UUID:appName", where
|
|
||||||
// appName is the container name.
|
|
||||||
// TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail.
|
|
||||||
func (r *Runtime) RunInContainer(containerID kubecontainer.ContainerID, cmd []string) ([]byte, error) {
|
|
||||||
glog.V(4).Infof("Rkt running in container.")
|
|
||||||
|
|
||||||
id, err := parseContainerID(containerID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
args := append([]string{}, "enter", fmt.Sprintf("--app=%s", id.appName), id.uuid)
|
|
||||||
args = append(args, cmd...)
|
|
||||||
|
|
||||||
result, err := r.buildCommand(args...).CombinedOutput()
|
|
||||||
if err != nil {
|
|
||||||
if exitErr, ok := err.(*exec.ExitError); ok {
|
|
||||||
err = &rktExitError{exitErr}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// rktExitError implemets /pkg/util/exec.ExitError interface.
|
// rktExitError implemets /pkg/util/exec.ExitError interface.
|
||||||
type rktExitError struct{ *exec.ExitError }
|
type rktExitError struct{ *exec.ExitError }
|
||||||
|
|
||||||
|
37
pkg/kubelet/util/ioutils/ioutils.go
Normal file
37
pkg/kubelet/util/ioutils/ioutils.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package ioutils
|
||||||
|
|
||||||
|
import "io"
|
||||||
|
|
||||||
|
// writeCloserWrapper represents a WriteCloser whose closer operation is noop.
|
||||||
|
type writeCloserWrapper struct {
|
||||||
|
Writer io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *writeCloserWrapper) Write(buf []byte) (int, error) {
|
||||||
|
return w.Writer.Write(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *writeCloserWrapper) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteCloserWrapper returns a writeCloserWrapper.
|
||||||
|
func WriteCloserWrapper(w io.Writer) io.WriteCloser {
|
||||||
|
return &writeCloserWrapper{w}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user