Implement RuntimeClass support for the Kubelet & CRI

This commit is contained in:
Tim Allclair 2018-08-27 13:12:45 -07:00
parent be11540775
commit 63f3bc1b7e
No known key found for this signature in database
GPG Key ID: 434D16BCEF479EAB
19 changed files with 468 additions and 25 deletions

View File

@ -111,6 +111,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/flag:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/authentication/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/authentication/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/authorization/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/authorization/v1beta1:go_default_library",

View File

@ -48,6 +48,7 @@ import (
"k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/healthz"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/flag" "k8s.io/apiserver/pkg/util/flag"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
@ -546,14 +547,16 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
// if in standalone mode, indicate as much by setting all clients to nil // if in standalone mode, indicate as much by setting all clients to nil
if standaloneMode { if standaloneMode {
kubeDeps.KubeClient = nil kubeDeps.KubeClient = nil
kubeDeps.DynamicKubeClient = nil
kubeDeps.EventClient = nil kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil kubeDeps.HeartbeatClient = nil
glog.Warningf("standalone mode, no API client") glog.Warningf("standalone mode, no API client")
} else if kubeDeps.KubeClient == nil || kubeDeps.EventClient == nil || kubeDeps.HeartbeatClient == nil { } else if kubeDeps.KubeClient == nil || kubeDeps.EventClient == nil || kubeDeps.HeartbeatClient == nil || kubeDeps.DynamicKubeClient == nil {
// initialize clients if not standalone mode and any of the clients are not provided // initialize clients if not standalone mode and any of the clients are not provided
var kubeClient clientset.Interface var kubeClient clientset.Interface
var eventClient v1core.EventsGetter var eventClient v1core.EventsGetter
var heartbeatClient clientset.Interface var heartbeatClient clientset.Interface
var dynamicKubeClient dynamic.Interface
clientConfig, err := createAPIServerClientConfig(s) clientConfig, err := createAPIServerClientConfig(s)
if err != nil { if err != nil {
@ -583,6 +586,10 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
clientCertificateManager.SetCertificateSigningRequestClient(kubeClient.CertificatesV1beta1().CertificateSigningRequests()) clientCertificateManager.SetCertificateSigningRequestClient(kubeClient.CertificatesV1beta1().CertificateSigningRequests())
clientCertificateManager.Start() clientCertificateManager.Start()
} }
dynamicKubeClient, err = dynamic.NewForConfig(clientConfig)
if err != nil {
glog.Warningf("Failed to initialize dynamic KubeClient: %v", err)
}
// make a separate client for events // make a separate client for events
eventClientConfig := *clientConfig eventClientConfig := *clientConfig
@ -617,6 +624,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
} }
kubeDeps.KubeClient = kubeClient kubeDeps.KubeClient = kubeClient
kubeDeps.DynamicKubeClient = dynamicKubeClient
if heartbeatClient != nil { if heartbeatClient != nil {
kubeDeps.HeartbeatClient = heartbeatClient kubeDeps.HeartbeatClient = heartbeatClient
kubeDeps.OnHeartbeatFailure = closeAllConns kubeDeps.OnHeartbeatFailure = closeAllConns

View File

@ -75,6 +75,7 @@ go_library(
"//pkg/kubelet/prober:go_default_library", "//pkg/kubelet/prober:go_default_library",
"//pkg/kubelet/prober/results:go_default_library", "//pkg/kubelet/prober/results:go_default_library",
"//pkg/kubelet/remote:go_default_library", "//pkg/kubelet/remote:go_default_library",
"//pkg/kubelet/runtimeclass:go_default_library",
"//pkg/kubelet/secret:go_default_library", "//pkg/kubelet/secret:go_default_library",
"//pkg/kubelet/server:go_default_library", "//pkg/kubelet/server:go_default_library",
"//pkg/kubelet/server/portforward:go_default_library", "//pkg/kubelet/server/portforward:go_default_library",
@ -127,6 +128,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
@ -287,6 +289,7 @@ filegroup(
"//pkg/kubelet/prober:all-srcs", "//pkg/kubelet/prober:all-srcs",
"//pkg/kubelet/qos:all-srcs", "//pkg/kubelet/qos:all-srcs",
"//pkg/kubelet/remote:all-srcs", "//pkg/kubelet/remote:all-srcs",
"//pkg/kubelet/runtimeclass:all-srcs",
"//pkg/kubelet/secret:all-srcs", "//pkg/kubelet/secret:all-srcs",
"//pkg/kubelet/server:all-srcs", "//pkg/kubelet/server:all-srcs",
"//pkg/kubelet/stats:all-srcs", "//pkg/kubelet/stats:all-srcs",

View File

@ -63,7 +63,7 @@ type ContainerManager interface {
type PodSandboxManager interface { type PodSandboxManager interface {
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state. // the sandbox is in ready state.
RunPodSandbox(config *runtimeapi.PodSandboxConfig) (string, error) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error)
// StopPodSandbox stops the sandbox. If there are any running containers in the // StopPodSandbox stops the sandbox. If there are any running containers in the
// sandbox, they should be force terminated. // sandbox, they should be force terminated.
StopPodSandbox(podSandboxID string) error StopPodSandbox(podSandboxID string) error

View File

@ -35,6 +35,8 @@ var (
type FakePodSandbox struct { type FakePodSandbox struct {
// PodSandboxStatus contains the runtime information for a sandbox. // PodSandboxStatus contains the runtime information for a sandbox.
runtimeapi.PodSandboxStatus runtimeapi.PodSandboxStatus
// RuntimeHandler is the runtime handler that was issued with the RunPodSandbox request.
RuntimeHandler string
} }
type FakeContainer struct { type FakeContainer struct {
@ -154,7 +156,7 @@ func (r *FakeRuntimeService) Status() (*runtimeapi.RuntimeStatus, error) {
return r.FakeStatus, nil return r.FakeStatus, nil
} }
func (r *FakeRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (string, error) { func (r *FakeRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
@ -176,6 +178,7 @@ func (r *FakeRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig)
Labels: config.Labels, Labels: config.Labels,
Annotations: config.Annotations, Annotations: config.Annotations,
}, },
RuntimeHandler: runtimeHandler,
} }
return podSandboxID, nil return podSandboxID, nil

View File

@ -96,6 +96,9 @@ func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPod
} }
// Step 2: Create the sandbox container. // Step 2: Create the sandbox container.
if r.GetRuntimeHandler() != "" {
return nil, fmt.Errorf("RuntimeHandler %q not supported", r.GetRuntimeHandler())
}
createConfig, err := ds.makeSandboxDockerConfig(config, image) createConfig, err := ds.makeSandboxDockerConfig(config, image)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err) return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err)

View File

@ -45,6 +45,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
@ -87,6 +88,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/prober" "k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/remote" "k8s.io/kubernetes/pkg/kubelet/remote"
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
"k8s.io/kubernetes/pkg/kubelet/secret" "k8s.io/kubernetes/pkg/kubelet/secret"
"k8s.io/kubernetes/pkg/kubelet/server" "k8s.io/kubernetes/pkg/kubelet/server"
serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats" serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
@ -245,6 +247,7 @@ type Dependencies struct {
OnHeartbeatFailure func() OnHeartbeatFailure func()
KubeClient clientset.Interface KubeClient clientset.Interface
CSIClient csiclientset.Interface CSIClient csiclientset.Interface
DynamicKubeClient dynamic.Interface
Mounter mount.Interface Mounter mount.Interface
OOMAdjuster *oom.OOMAdjuster OOMAdjuster *oom.OOMAdjuster
OSInterface kubecontainer.OSInterface OSInterface kubecontainer.OSInterface
@ -653,6 +656,11 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
return nil, err return nil, err
} }
klet.runtimeService = runtimeService klet.runtimeService = runtimeService
if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) {
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.DynamicKubeClient)
}
runtime, err := kuberuntime.NewKubeGenericRuntimeManager( runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder), kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager, klet.livenessManager,
@ -673,6 +681,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
imageService, imageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(), kubeDeps.ContainerManager.InternalContainerLifecycle(),
legacyLogProvider, legacyLogProvider,
klet.runtimeClassManager,
) )
if err != nil { if err != nil {
return nil, err return nil, err
@ -1192,6 +1201,9 @@ type Kubelet struct {
// This flag indicates that kubelet should start plugin watcher utility server for discovering Kubelet plugins // This flag indicates that kubelet should start plugin watcher utility server for discovering Kubelet plugins
enablePluginsWatcher bool enablePluginsWatcher bool
// Handles RuntimeClass objects for the Kubelet.
runtimeClassManager *runtimeclass.Manager
} }
func allGlobalUnicastIPs() ([]net.IP, error) { func allGlobalUnicastIPs() ([]net.IP, error) {
@ -1412,6 +1424,11 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
kl.statusManager.Start() kl.statusManager.Start()
kl.probeManager.Start() kl.probeManager.Start()
// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
go kl.runtimeClassManager.Run(wait.NeverStop)
}
// Start the pod lifecycle event generator. // Start the pod lifecycle event generator.
kl.pleg.Start() kl.pleg.Start()
kl.syncLoop(updates, kl) kl.syncLoop(updates, kl)

View File

@ -45,6 +45,7 @@ go_library(
"//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/prober/results:go_default_library", "//pkg/kubelet/prober/results:go_default_library",
"//pkg/kubelet/runtimeclass:go_default_library",
"//pkg/kubelet/types:go_default_library", "//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/cache:go_default_library", "//pkg/kubelet/util/cache:go_default_library",
"//pkg/kubelet/util/format:go_default_library", "//pkg/kubelet/util/format:go_default_library",
@ -106,6 +107,8 @@ go_test(
"//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/runtimeclass:go_default_library",
"//pkg/kubelet/runtimeclass/testing:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
@ -119,6 +122,7 @@ go_test(
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
], ],
) )

View File

@ -176,11 +176,11 @@ func (in instrumentedRuntimeService) Attach(req *runtimeapi.AttachRequest) (*run
return resp, err return resp, err
} }
func (in instrumentedRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (string, error) { func (in instrumentedRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
const operation = "run_podsandbox" const operation = "run_podsandbox"
defer recordOperation(operation, time.Now()) defer recordOperation(operation, time.Now())
out, err := in.service.RunPodSandbox(config) out, err := in.service.RunPodSandbox(config, runtimeHandler)
recordError(operation, err) recordError(operation, err)
return out, err return out, err
} }

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
"k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/cache" "k8s.io/kubernetes/pkg/kubelet/util/cache"
"k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/format"
@ -119,6 +120,9 @@ type kubeGenericRuntimeManager struct {
// A shim to legacy functions for backward compatibility. // A shim to legacy functions for backward compatibility.
legacyLogProvider LegacyLogProvider legacyLogProvider LegacyLogProvider
// Manage RuntimeClass resources.
runtimeClassManager *runtimeclass.Manager
} }
type KubeGenericRuntime interface { type KubeGenericRuntime interface {
@ -154,6 +158,7 @@ func NewKubeGenericRuntimeManager(
imageService internalapi.ImageManagerService, imageService internalapi.ImageManagerService,
internalLifecycle cm.InternalContainerLifecycle, internalLifecycle cm.InternalContainerLifecycle,
legacyLogProvider LegacyLogProvider, legacyLogProvider LegacyLogProvider,
runtimeClassManager *runtimeclass.Manager,
) (KubeGenericRuntime, error) { ) (KubeGenericRuntime, error) {
kubeRuntimeManager := &kubeGenericRuntimeManager{ kubeRuntimeManager := &kubeGenericRuntimeManager{
recorder: recorder, recorder: recorder,
@ -170,6 +175,7 @@ func NewKubeGenericRuntimeManager(
keyring: credentialprovider.NewDockerKeyring(), keyring: credentialprovider.NewDockerKeyring(),
internalLifecycle: internalLifecycle, internalLifecycle: internalLifecycle,
legacyLogProvider: legacyLogProvider, legacyLogProvider: legacyLogProvider,
runtimeClassManager: runtimeClassManager,
} }
typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion) typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion)

View File

@ -50,7 +50,16 @@ func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32
return "", message, err return "", message, err
} }
podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig) runtimeHandler := ""
if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) {
runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
if err != nil {
message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
return "", message, err
}
}
podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
if err != nil { if err != nil {
message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err) message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
glog.Error(message) glog.Error(message)

View File

@ -22,31 +22,24 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/kubernetes/pkg/features"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
rctest "k8s.io/kubernetes/pkg/kubelet/runtimeclass/testing"
"k8s.io/utils/pointer"
) )
// TestCreatePodSandbox tests creating sandbox and its corresponding pod log directory. // TestCreatePodSandbox tests creating sandbox and its corresponding pod log directory.
func TestCreatePodSandbox(t *testing.T) { func TestCreatePodSandbox(t *testing.T) {
fakeRuntime, _, m, err := createTestRuntimeManager() fakeRuntime, _, m, err := createTestRuntimeManager()
pod := &v1.Pod{ require.NoError(t, err)
ObjectMeta: metav1.ObjectMeta{ pod := newTestPod()
UID: "12345678",
Name: "bar",
Namespace: "new",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "foo",
Image: "busybox",
ImagePullPolicy: v1.PullIfNotPresent,
},
},
},
}
fakeOS := m.osInterface.(*containertest.FakeOS) fakeOS := m.osInterface.(*containertest.FakeOS)
fakeOS.MkdirAllFn = func(path string, perm os.FileMode) error { fakeOS.MkdirAllFn = func(path string, perm os.FileMode) error {
@ -63,3 +56,60 @@ func TestCreatePodSandbox(t *testing.T) {
assert.Equal(t, len(sandboxes), 1) assert.Equal(t, len(sandboxes), 1)
// TODO Check pod sandbox configuration // TODO Check pod sandbox configuration
} }
// TestCreatePodSandbox_RuntimeClass tests creating sandbox with RuntimeClasses enabled.
func TestCreatePodSandbox_RuntimeClass(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RuntimeClass, true)()
rcm := runtimeclass.NewManager(rctest.NewPopulatedDynamicClient())
defer rctest.StartManagerSync(t, rcm)()
fakeRuntime, _, m, err := createTestRuntimeManager()
require.NoError(t, err)
m.runtimeClassManager = rcm
tests := map[string]struct {
rcn *string
expectedHandler string
expectError bool
}{
"unspecified RuntimeClass": {rcn: nil, expectedHandler: ""},
"valid RuntimeClass": {rcn: pointer.StringPtr(rctest.SandboxRuntimeClass), expectedHandler: rctest.SandboxRuntimeHandler},
"missing RuntimeClass": {rcn: pointer.StringPtr("phantom"), expectError: true},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
pod := newTestPod()
pod.Spec.RuntimeClassName = test.rcn
id, _, err := m.createPodSandbox(pod, 1)
if test.expectError {
assert.Error(t, err)
assert.Contains(t, fakeRuntime.Called, "RunPodSandbox")
} else {
assert.NoError(t, err)
assert.Contains(t, fakeRuntime.Called, "RunPodSandbox")
assert.Equal(t, test.expectedHandler, fakeRuntime.Sandboxes[id].RuntimeHandler)
}
})
}
}
func newTestPod() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "12345678",
Name: "bar",
Namespace: "new",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "foo",
Image: "busybox",
ImagePullPolicy: v1.PullIfNotPresent,
},
},
},
}
}

View File

@ -77,7 +77,7 @@ func (f *RemoteRuntime) Version(ctx context.Context, req *kubeapi.VersionRequest
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure // RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure
// the sandbox is in the ready state on success. // the sandbox is in the ready state on success.
func (f *RemoteRuntime) RunPodSandbox(ctx context.Context, req *kubeapi.RunPodSandboxRequest) (*kubeapi.RunPodSandboxResponse, error) { func (f *RemoteRuntime) RunPodSandbox(ctx context.Context, req *kubeapi.RunPodSandboxRequest) (*kubeapi.RunPodSandboxResponse, error) {
sandboxID, err := f.RuntimeService.RunPodSandbox(req.Config) sandboxID, err := f.RuntimeService.RunPodSandbox(req.Config, req.RuntimeHandler)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -79,14 +79,15 @@ func (r *RemoteRuntimeService) Version(apiVersion string) (*runtimeapi.VersionRe
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state. // the sandbox is in ready state.
func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (string, error) { func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
// Use 2 times longer timeout for sandbox operation (4 mins by default) // Use 2 times longer timeout for sandbox operation (4 mins by default)
// TODO: Make the pod sandbox timeout configurable. // TODO: Make the pod sandbox timeout configurable.
ctx, cancel := getContextWithTimeout(r.timeout * 2) ctx, cancel := getContextWithTimeout(r.timeout * 2)
defer cancel() defer cancel()
resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{ resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
Config: config, Config: config,
RuntimeHandler: runtimeHandler,
}) })
if err != nil { if err != nil {
glog.Errorf("RunPodSandbox from runtime service failed: %v", err) glog.Errorf("RunPodSandbox from runtime service failed: %v", err)

View File

@ -0,0 +1,45 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["runtimeclass_manager.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/runtimeclass",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/runtimeclass/testing:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["runtimeclass_manager_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/runtimeclass/testing:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)

View File

@ -0,0 +1,97 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runtimeclass
import (
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
)
var (
runtimeClassGVR = schema.GroupVersionResource{
Group: "node.k8s.io",
Version: "v1alpha1",
Resource: "runtimeclasses",
}
)
// Manager caches RuntimeClass API objects, and provides accessors to the Kubelet.
type Manager struct {
informer cache.SharedInformer
}
// NewManager returns a new RuntimeClass Manager. Run must be called before the manager can be used.
func NewManager(client dynamic.Interface) *Manager {
rc := client.Resource(runtimeClassGVR)
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return rc.List(options)
},
WatchFunc: rc.Watch,
}
informer := cache.NewSharedInformer(lw, &unstructured.Unstructured{}, 0)
return &Manager{
informer: informer,
}
}
// Run starts syncing the RuntimeClass cache with the apiserver.
func (m *Manager) Run(stopCh <-chan struct{}) {
m.informer.Run(stopCh)
}
// LookupRuntimeHandler returns the RuntimeHandler string associated with the given RuntimeClass
// name (or the default of "" for nil). If the RuntimeClass is not found, it returns an
// apierrors.NotFound error.
func (m *Manager) LookupRuntimeHandler(runtimeClassName *string) (string, error) {
if runtimeClassName == nil || *runtimeClassName == "" {
// The default RuntimeClass always resolves to the empty runtime handler.
return "", nil
}
name := *runtimeClassName
item, exists, err := m.informer.GetStore().GetByKey(name)
if err != nil {
return "", fmt.Errorf("Failed to lookup RuntimeClass %s: %v", name, err)
}
if !exists {
return "", errors.NewNotFound(schema.GroupResource{
Group: runtimeClassGVR.Group,
Resource: runtimeClassGVR.Resource,
}, name)
}
rc, ok := item.(*unstructured.Unstructured)
if !ok {
return "", fmt.Errorf("unexpected RuntimeClass type %T", item)
}
handler, _, err := unstructured.NestedString(rc.Object, "spec", "runtimeHandler")
if err != nil {
return "", fmt.Errorf("Invalid RuntimeClass object: %v", err)
}
return handler, nil
}

View File

@ -0,0 +1,61 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runtimeclass_test
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
rctest "k8s.io/kubernetes/pkg/kubelet/runtimeclass/testing"
"k8s.io/utils/pointer"
)
func TestLookupRuntimeHandler(t *testing.T) {
tests := []struct {
rcn *string
expected string
expectError bool
}{
{rcn: nil, expected: ""},
{rcn: pointer.StringPtr(""), expected: ""},
{rcn: pointer.StringPtr(rctest.EmptyRuntimeClass), expected: ""},
{rcn: pointer.StringPtr(rctest.SandboxRuntimeClass), expected: "kata-containers"},
{rcn: pointer.StringPtr(rctest.InvalidRuntimeClass), expectError: true},
{rcn: pointer.StringPtr("phantom"), expectError: true},
}
manager := runtimeclass.NewManager(rctest.NewPopulatedDynamicClient())
defer rctest.StartManagerSync(t, manager)()
for _, test := range tests {
tname := "nil"
if test.rcn != nil {
tname = *test.rcn
}
t.Run(fmt.Sprintf("%q->%q(err:%v)", tname, test.expected, test.expectError), func(t *testing.T) {
handler, err := manager.LookupRuntimeHandler(test.rcn)
if test.expectError {
assert.Error(t, err, "handler=%q", handler)
} else {
assert.NoError(t, err)
assert.Equal(t, test.expected, handler)
}
})
}
}

View File

@ -0,0 +1,33 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["fake_manager.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/runtimeclass/testing",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/runtimeclass:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/dynamic/fake:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,102 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
fakedynamic "k8s.io/client-go/dynamic/fake"
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
"k8s.io/utils/pointer"
)
const (
// SandboxRuntimeClass is a valid RuntimeClass pre-populated in the populated dynamic client.
SandboxRuntimeClass = "sandbox"
// SandboxRuntimeHandler is the handler associated with the SandboxRuntimeClass.
SandboxRuntimeHandler = "kata-containers"
// EmptyRuntimeClass is a valid RuntimeClass without a handler pre-populated in the populated dynamic client.
EmptyRuntimeClass = "native"
// InvalidRuntimeClass is an invalid RuntimeClass pre-populated in the populated dynamic client.
InvalidRuntimeClass = "foo"
)
// NewPopulatedDynamicClient creates a dynamic client for use with the runtimeclass.Manager,
// and populates it with a few test RuntimeClass objects.
func NewPopulatedDynamicClient() dynamic.Interface {
invalidRC := NewUnstructuredRuntimeClass(InvalidRuntimeClass, "")
invalidRC.Object["spec"].(map[string]interface{})["runtimeHandler"] = true
client := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(),
NewUnstructuredRuntimeClass(EmptyRuntimeClass, ""),
NewUnstructuredRuntimeClass(SandboxRuntimeClass, SandboxRuntimeHandler),
invalidRC,
)
return client
}
// StartManagerSync runs the manager, and waits for startup by polling for the expected "native"
// RuntimeClass to be populated. Returns a function to stop the manager, which should be called with
// a defer:
// defer StartManagerSync(t, m)()
// Any errors are considered fatal to the test.
func StartManagerSync(t *testing.T, m *runtimeclass.Manager) func() {
stopCh := make(chan struct{})
go m.Run(stopCh)
// Wait for informer to populate.
err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
_, err := m.LookupRuntimeHandler(pointer.StringPtr(EmptyRuntimeClass))
if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
})
require.NoError(t, err, "Failed to start manager")
return func() {
close(stopCh)
}
}
// NewUnstructuredRuntimeClass is a helper to generate an unstructured RuntimeClass resource with
// the given name & handler.
func NewUnstructuredRuntimeClass(name, handler string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "node.k8s.io/v1alpha1",
"kind": "RuntimeClass",
"metadata": map[string]interface{}{
"name": name,
},
"spec": map[string]interface{}{
"runtimeHandler": handler,
},
},
}
}