Kubelet: add kubeGenericRuntimeManager for new runtime API

This commit is contained in:
Pengfei Ni 2016-08-03 07:25:02 +08:00
parent 61a6596fa1
commit 81a79cd4c0
6 changed files with 694 additions and 0 deletions

View File

@ -0,0 +1,102 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kuberuntime
import (
"io/ioutil"
"net/http"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/componentconfig"
"k8s.io/kubernetes/pkg/client/record"
internalApi "k8s.io/kubernetes/pkg/kubelet/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/network"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
kubetypes "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
type fakeHTTP struct {
url string
err error
}
func (f *fakeHTTP) Get(url string) (*http.Response, error) {
f.url = url
return nil, f.err
}
// fakeRuntimeHelper implements kubecontainer.RuntimeHelper interfaces for testing purposes.
type fakeRuntimeHelper struct{}
func (f *fakeRuntimeHelper) GenerateRunContainerOptions(pod *api.Pod, container *api.Container, podIP string) (*kubecontainer.RunContainerOptions, error) {
var opts kubecontainer.RunContainerOptions
if len(container.TerminationMessagePath) != 0 {
testPodContainerDir, err := ioutil.TempDir("", "fooPodContainerDir")
if err != nil {
return nil, err
}
opts.PodContainerDir = testPodContainerDir
}
return &opts, nil
}
func (f *fakeRuntimeHelper) GetClusterDNS(pod *api.Pod) ([]string, []string, error) {
return nil, nil, nil
}
// This is not used by docker runtime.
func (f *fakeRuntimeHelper) GeneratePodHostNameAndDomain(pod *api.Pod) (string, string, error) {
return "", "", nil
}
func (f *fakeRuntimeHelper) GetPodDir(kubetypes.UID) string {
return ""
}
func (f *fakeRuntimeHelper) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 {
return nil
}
func NewFakeKubeRuntimeManager(runtimeService internalApi.RuntimeService, imageService internalApi.ImageManagerService) (*kubeGenericRuntimeManager, error) {
networkPlugin, _ := network.InitNetworkPlugin(
[]network.NetworkPlugin{},
"",
nettest.NewFakeHost(nil),
componentconfig.HairpinNone,
"10.0.0.0/8",
)
return NewKubeGenericRuntimeManager(
&record.FakeRecorder{},
proberesults.NewManager(),
kubecontainer.NewRefManager(),
&containertest.FakeOS{},
networkPlugin,
&fakeRuntimeHelper{},
&fakeHTTP{},
flowcontrol.NewBackOff(time.Second, 300*time.Second),
false,
false,
runtimeService,
imageService,
)
}

View File

@ -0,0 +1,49 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kuberuntime
import (
"fmt"
"io"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/term"
)
// AttachContainer attaches to the container's console
func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) {
return fmt.Errorf("not implemented")
}
// GetContainerLogs returns logs of a specific container.
func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
return fmt.Errorf("not implemented")
}
// Runs the command in the container of the specified pod using nsenter.
// Attaches the processes stdin, stdout, and stderr. Optionally uses a
// tty.
// TODO: handle terminal resizing, refer https://github.com/kubernetes/kubernetes/issues/29579
func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
return fmt.Errorf("not implemented")
}
// DeleteContainer removes a container.
func (m *kubeGenericRuntimeManager) DeleteContainer(containerID kubecontainer.ContainerID) error {
return m.runtimeService.RemoveContainer(containerID.ID)
}

View File

@ -0,0 +1,136 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kuberuntime
import (
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/credentialprovider"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/parsers"
)
// PullImage pulls an image from the network to local storage using the supplied
// secrets if necessary.
// TODO: pull image with qps and burst, ref https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/dockertools/docker.go#L120
func (m *kubeGenericRuntimeManager) PullImage(image kubecontainer.ImageSpec, pullSecrets []api.Secret) error {
img := image.Image
repoToPull, _, _, err := parsers.ParseImageName(img)
if err != nil {
return err
}
keyring, err := credentialprovider.MakeDockerKeyring(pullSecrets, m.keyring)
if err != nil {
return err
}
imgSpec := &runtimeApi.ImageSpec{Image: &img}
creds, withCredentials := keyring.Lookup(repoToPull)
if !withCredentials {
glog.V(3).Infof("Pulling image %q without credentials", img)
err = m.imageService.PullImage(imgSpec, nil)
if err != nil {
glog.Errorf("Pull image %q failed: %v", img, err)
return err
}
return nil
}
var pullErrs []error
for _, currentCreds := range creds {
authConfig := credentialprovider.LazyProvide(currentCreds)
auth := &runtimeApi.AuthConfig{
Username: &authConfig.Username,
Password: &authConfig.Password,
Auth: &authConfig.Auth,
ServerAddress: &authConfig.ServerAddress,
IdentityToken: &authConfig.IdentityToken,
RegistryToken: &authConfig.RegistryToken,
}
err = m.imageService.PullImage(imgSpec, auth)
// If there was no error, return success
if err == nil {
return nil
}
pullErrs = append(pullErrs, err)
}
return utilerrors.NewAggregate(pullErrs)
}
// IsImagePresent checks whether the container image is already in the local storage.
func (m *kubeGenericRuntimeManager) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) {
images, err := m.imageService.ListImages(&runtimeApi.ImageFilter{
Image: &runtimeApi.ImageSpec{
Image: &image.Image,
},
})
if err != nil {
glog.Errorf("ListImages failed: %v", err)
return false, err
}
return len(images) > 0, nil
}
// ListImages gets all images currently on the machine.
func (m *kubeGenericRuntimeManager) ListImages() ([]kubecontainer.Image, error) {
var images []kubecontainer.Image
allImages, err := m.imageService.ListImages(nil)
if err != nil {
glog.Errorf("ListImages failed: %v", err)
return nil, err
}
for _, img := range allImages {
images = append(images, kubecontainer.Image{
ID: img.GetId(),
Size: int64(img.GetSize_()),
RepoTags: img.RepoTags,
RepoDigests: img.RepoDigests,
})
}
return images, nil
}
// RemoveImage removes the specified image.
func (m *kubeGenericRuntimeManager) RemoveImage(image kubecontainer.ImageSpec) error {
err := m.imageService.RemoveImage(&runtimeApi.ImageSpec{Image: &image.Image})
if err != nil {
glog.Errorf("Remove image %q failed: %v", image.Image, err)
return err
}
return nil
}
// ImageStats returns the statistics of the image.
func (m *kubeGenericRuntimeManager) ImageStats() (*kubecontainer.ImageStats, error) {
// TODO: support image stats in new runtime interface
return nil, fmt.Errorf("not implemented")
}

View File

@ -0,0 +1,80 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kuberuntime
import (
"testing"
"github.com/stretchr/testify/assert"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/sets"
)
func TestPullImage(t *testing.T) {
_, _, fakeManager, err := createTestFakeRuntimeManager()
assert.NoError(t, err)
err = fakeManager.PullImage(kubecontainer.ImageSpec{Image: "busybox"}, nil)
assert.NoError(t, err)
images, err := fakeManager.ListImages()
assert.NoError(t, err)
assert.Equal(t, 1, len(images))
assert.Equal(t, images[0].RepoTags, []string{"busybox"})
}
func TestListImages(t *testing.T) {
_, fakeImageService, fakeManager, err := createTestFakeRuntimeManager()
assert.NoError(t, err)
images := []string{"1111", "2222", "3333"}
expected := sets.NewString(images...)
fakeImageService.SetFakeImages(images)
actualImages, err := fakeManager.ListImages()
assert.NoError(t, err)
actual := sets.NewString()
for _, i := range actualImages {
actual.Insert(i.ID)
}
assert.Equal(t, expected.List(), actual.List())
}
func TestIsImagePresent(t *testing.T) {
_, fakeImageService, fakeManager, err := createTestFakeRuntimeManager()
assert.NoError(t, err)
image := "busybox"
fakeImageService.SetFakeImages([]string{image})
present, err := fakeManager.IsImagePresent(kubecontainer.ImageSpec{Image: image})
assert.NoError(t, err)
assert.Equal(t, true, present)
}
func TestRemoveImage(t *testing.T) {
_, fakeImageService, fakeManager, err := createTestFakeRuntimeManager()
assert.NoError(t, err)
err = fakeManager.PullImage(kubecontainer.ImageSpec{Image: "busybox"}, nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(fakeImageService.Images))
err = fakeManager.RemoveImage(kubecontainer.ImageSpec{Image: "busybox"})
assert.NoError(t, err)
assert.Equal(t, 0, len(fakeImageService.Images))
}

View File

@ -0,0 +1,274 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kuberuntime
import (
"errors"
"fmt"
"io"
"os"
"github.com/coreos/go-semver/semver"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/credentialprovider"
internalApi "k8s.io/kubernetes/pkg/kubelet/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/network"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/types"
kubetypes "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
const (
// The api version of kubelet runtime api
kubeRuntimeAPIVersion = "0.1.0"
// The root directory for pod logs
podLogsRootDirectory = "/var/log/pods"
)
var (
// ErrVersionNotSupported is returned when the api version of runtime interface is not supported
ErrVersionNotSupported = errors.New("Runtime api version is not supported")
)
type kubeGenericRuntimeManager struct {
runtimeName string
recorder record.EventRecorder
osInterface kubecontainer.OSInterface
containerRefManager *kubecontainer.RefManager
// Keyring for pulling images
keyring credentialprovider.DockerKeyring
// Runner of lifecycle events.
runner kubecontainer.HandlerRunner
// RuntimeHelper that wraps kubelet to generate runtime container options.
runtimeHelper kubecontainer.RuntimeHelper
// Health check results.
livenessManager proberesults.Manager
// If true, enforce container cpu limits with CFS quota support
cpuCFSQuota bool
// Network plugin.
networkPlugin network.NetworkPlugin
// wrapped image puller.
imagePuller images.ImageManager
// gRPC service clients
runtimeService internalApi.RuntimeService
imageService internalApi.ImageManagerService
}
// NewKubeGenericRuntimeManager creates a new kubeGenericRuntimeManager
func NewKubeGenericRuntimeManager(
recorder record.EventRecorder,
livenessManager proberesults.Manager,
containerRefManager *kubecontainer.RefManager,
osInterface kubecontainer.OSInterface,
networkPlugin network.NetworkPlugin,
runtimeHelper kubecontainer.RuntimeHelper,
httpClient types.HttpGetter,
imageBackOff *flowcontrol.Backoff,
serializeImagePulls bool,
cpuCFSQuota bool,
runtimeService internalApi.RuntimeService,
imageService internalApi.ImageManagerService,
) (*kubeGenericRuntimeManager, error) {
kubeRuntimeManager := &kubeGenericRuntimeManager{
recorder: recorder,
cpuCFSQuota: cpuCFSQuota,
livenessManager: livenessManager,
containerRefManager: containerRefManager,
osInterface: osInterface,
networkPlugin: networkPlugin,
runtimeHelper: runtimeHelper,
runtimeService: runtimeService,
imageService: imageService,
keyring: credentialprovider.NewDockerKeyring(),
}
typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion)
if err != nil {
glog.Errorf("Get runtime version failed: %v", err)
return nil, err
}
// Only matching kubeRuntimeAPIVersion is supported now
// TODO: Runtime API machinery is under discussion at https://github.com/kubernetes/kubernetes/issues/28642
if typedVersion.GetVersion() != kubeRuntimeAPIVersion {
glog.Errorf("Runtime api version %s is not supported, only %s is supported now",
typedVersion.GetVersion(),
kubeRuntimeAPIVersion)
return nil, ErrVersionNotSupported
}
kubeRuntimeManager.runtimeName = typedVersion.GetRuntimeName()
glog.Infof("Container runtime %s initialized, version: %s, apiVersion: %s",
typedVersion.GetRuntimeName(),
typedVersion.GetRuntimeVersion(),
typedVersion.GetRuntimeApiVersion())
// If the container logs directory does not exist, create it.
// TODO: create podLogsRootDirectory at kubelet.go when kubelet is refactored to
// new runtime interface
if _, err := osInterface.Stat(podLogsRootDirectory); os.IsNotExist(err) {
if err := osInterface.MkdirAll(podLogsRootDirectory, 0755); err != nil {
glog.Errorf("Failed to create directory %q: %v", podLogsRootDirectory, err)
}
}
kubeRuntimeManager.imagePuller = images.NewImageManager(
kubecontainer.FilterEventRecorder(recorder),
kubeRuntimeManager,
imageBackOff,
serializeImagePulls)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
return kubeRuntimeManager, nil
}
// Type returns the type of the container runtime.
func (m *kubeGenericRuntimeManager) Type() string {
return m.runtimeName
}
// runtimeVersion implements kubecontainer.Version interface by implementing
// Compare() and String()
type runtimeVersion struct {
*semver.Version
}
func newRuntimeVersion(version string) (runtimeVersion, error) {
sem, err := semver.NewVersion(version)
if err != nil {
return runtimeVersion{}, err
}
return runtimeVersion{sem}, nil
}
func (r runtimeVersion) Compare(other string) (int, error) {
v, err := semver.NewVersion(other)
if err != nil {
return -1, err
}
if r.LessThan(*v) {
return -1, nil
}
if v.LessThan(*r.Version) {
return 1, nil
}
return 0, nil
}
// Version returns the version information of the container runtime.
func (m *kubeGenericRuntimeManager) Version() (kubecontainer.Version, error) {
typedVersion, err := m.runtimeService.Version(kubeRuntimeAPIVersion)
if err != nil {
glog.Errorf("Get remote runtime version failed: %v", err)
return nil, err
}
return newRuntimeVersion(typedVersion.GetVersion())
}
// APIVersion returns the cached API version information of the container
// runtime. Implementation is expected to update this cache periodically.
// This may be different from the runtime engine's version.
func (m *kubeGenericRuntimeManager) APIVersion() (kubecontainer.Version, error) {
typedVersion, err := m.runtimeService.Version(kubeRuntimeAPIVersion)
if err != nil {
glog.Errorf("Get remote runtime version failed: %v", err)
return nil, err
}
return newRuntimeVersion(typedVersion.GetRuntimeApiVersion())
}
// Status returns error if the runtime is unhealthy; nil otherwise.
func (m *kubeGenericRuntimeManager) Status() error {
_, err := m.runtimeService.Version(kubeRuntimeAPIVersion)
if err != nil {
glog.Errorf("Checkout remote runtime status failed: %v", err)
return err
}
return nil
}
// GetPods returns a list of containers grouped by pods. The boolean parameter
// specifies whether the runtime returns all containers including those already
// exited and dead containers (used for garbage collection).
func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, error) {
return nil, fmt.Errorf("not implemented")
}
// SyncPod syncs the running pod into the desired pod.
func (m *kubeGenericRuntimeManager) SyncPod(pod *api.Pod, _ api.PodStatus,
podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret,
backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
result.Fail(fmt.Errorf("not implemented"))
return
}
// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
// gracePeriodOverride if specified allows the caller to override the pod default grace period.
// only hard kill paths are allowed to specify a gracePeriodOverride in the kubelet in order to not corrupt user data.
// it is useful when doing SIGKILL for hard eviction scenarios, or max grace period during soft eviction scenarios.
func (m *kubeGenericRuntimeManager) KillPod(pod *api.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error {
return fmt.Errorf("not implemented")
}
// GetPodStatus retrieves the status of the pod, including the
// information of all containers in the pod that are visble in Runtime.
func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
return nil, fmt.Errorf("not implemented")
}
// Returns the filesystem path of the pod's network namespace; if the
// runtime does not handle namespace creation itself, or cannot return
// the network namespace path, it should return an error.
// TODO: Change ContainerID to a Pod ID since the namespace is shared
// by all containers in the pod.
func (m *kubeGenericRuntimeManager) GetNetNS(containerID kubecontainer.ContainerID) (string, error) {
return "", fmt.Errorf("not implemented")
}
// GetPodContainerID gets pod sandbox ID
func (m *kubeGenericRuntimeManager) GetPodContainerID(pod *kubecontainer.Pod) (kubecontainer.ContainerID, error) {
return kubecontainer.ContainerID{}, fmt.Errorf("not implemented")
}
// Forward the specified port from the specified pod to the stream.
func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
return fmt.Errorf("not implemented")
}
// GarbageCollect removes dead containers using the specified container gc policy
func (m *kubeGenericRuntimeManager) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool) error {
return fmt.Errorf("not implemented")
}

View File

@ -0,0 +1,53 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kuberuntime
import (
"testing"
"github.com/stretchr/testify/assert"
apitest "k8s.io/kubernetes/pkg/kubelet/api/testing"
)
func createTestFakeRuntimeManager() (*apitest.FakeRuntimeService, *apitest.FakeImageService, *kubeGenericRuntimeManager, error) {
fakeRuntimeService := apitest.NewFakeRuntimeService()
fakeImageService := apitest.NewFakeImageService()
manager, err := NewFakeKubeRuntimeManager(fakeRuntimeService, fakeImageService)
return fakeRuntimeService, fakeImageService, manager, err
}
func TestNewKubeRuntimeManager(t *testing.T) {
_, _, _, err := createTestFakeRuntimeManager()
assert.NoError(t, err)
}
func TestVersion(t *testing.T) {
_, _, m, err := createTestFakeRuntimeManager()
assert.NoError(t, err)
version, err := m.Version()
assert.NoError(t, err)
assert.Equal(t, kubeRuntimeAPIVersion, version.String())
}
func TestContainerRuntimeType(t *testing.T) {
_, _, m, err := createTestFakeRuntimeManager()
assert.NoError(t, err)
runtimeType := m.Type()
assert.Equal(t, "fakeRuntime", runtimeType)
}