Kubelet now implements the V1 podresources API

This commit is contained in:
Renaud Gaubert 2020-10-10 15:27:55 -07:00
parent 68cf24c087
commit 817bf784d2
9 changed files with 295 additions and 28 deletions

View File

@ -5,13 +5,16 @@ go_library(
srcs = [
"client.go",
"constants.go",
"server.go",
"server_v1.go",
"server_v1alpha1.go",
"types.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/apis/podresources",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/kubelet/pkg/apis/podresources/v1:go_default_library",
"//staging/src/k8s.io/kubelet/pkg/apis/podresources/v1alpha1:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
@ -19,12 +22,16 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["server_test.go"],
srcs = [
"server_v1_test.go",
"server_v1alpha1_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/kubelet/pkg/apis/podresources/v1:go_default_library",
"//staging/src/k8s.io/kubelet/pkg/apis/podresources/v1alpha1:go_default_library",
"//vendor/github.com/stretchr/testify/mock:go_default_library",
],

View File

@ -23,12 +23,17 @@ import (
"google.golang.org/grpc"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
"k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/util"
)
// GetClient returns a client for the PodResourcesLister grpc service
func GetClient(socket string, connectionTimeout time.Duration, maxMsgSize int) (podresourcesapi.PodResourcesListerClient, *grpc.ClientConn, error) {
// Note: Consumers of the pod resources API should not be importing this package.
// They should copy paste the function in their project.
// GetV1alpha1Client returns a client for the PodResourcesLister grpc service
// Note: This is deprecated
func GetV1alpha1Client(socket string, connectionTimeout time.Duration, maxMsgSize int) (v1alpha1.PodResourcesListerClient, *grpc.ClientConn, error) {
addr, dialer, err := util.GetAddressAndDialer(socket)
if err != nil {
return nil, nil, err
@ -40,5 +45,21 @@ func GetClient(socket string, connectionTimeout time.Duration, maxMsgSize int) (
if err != nil {
return nil, nil, fmt.Errorf("error dialing socket %s: %v", socket, err)
}
return podresourcesapi.NewPodResourcesListerClient(conn), conn, nil
return v1alpha1.NewPodResourcesListerClient(conn), conn, nil
}
// GetV1Client returns a client for the PodResourcesLister grpc service
func GetV1Client(socket string, connectionTimeout time.Duration, maxMsgSize int) (v1.PodResourcesListerClient, *grpc.ClientConn, error) {
addr, dialer, err := util.GetAddressAndDialer(socket)
if err != nil {
return nil, nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
return nil, nil, fmt.Errorf("error dialing socket %s: %v", socket, err)
}
return v1.NewPodResourcesListerClient(conn), conn, nil
}

View File

@ -0,0 +1,76 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podresources
import (
"context"
"k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
)
// podResourcesServerV1alpha1 implements PodResourcesListerServer
type v1PodResourcesServer struct {
podsProvider PodsProvider
devicesProvider DevicesProvider
}
// NewV1PodResourcesServer returns a PodResourcesListerServer which lists pods provided by the PodsProvider
// with device information provided by the DevicesProvider
func NewV1PodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesProvider) v1.PodResourcesListerServer {
return &v1PodResourcesServer{
podsProvider: podsProvider,
devicesProvider: devicesProvider,
}
}
func alphaDevicesToV1(alphaDevs []*v1alpha1.ContainerDevices) []*v1.ContainerDevices {
var devs []*v1.ContainerDevices
for _, alphaDev := range alphaDevs {
dev := v1.ContainerDevices(*alphaDev)
devs = append(devs, &dev)
}
return devs
}
// List returns information about the resources assigned to pods on the node
func (p *v1PodResourcesServer) List(ctx context.Context, req *v1.ListPodResourcesRequest) (*v1.ListPodResourcesResponse, error) {
pods := p.podsProvider.GetPods()
podResources := make([]*v1.PodResources, len(pods))
p.devicesProvider.UpdateAllocatedDevices()
for i, pod := range pods {
pRes := v1.PodResources{
Name: pod.Name,
Namespace: pod.Namespace,
Containers: make([]*v1.ContainerResources, len(pod.Spec.Containers)),
}
for j, container := range pod.Spec.Containers {
pRes.Containers[j] = &v1.ContainerResources{
Name: container.Name,
Devices: alphaDevicesToV1(p.devicesProvider.GetDevices(string(pod.UID), container.Name)),
}
}
podResources[i] = &pRes
}
return &v1.ListPodResourcesResponse{
PodResources: podResources,
}, nil
}

View File

@ -0,0 +1,139 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podresources
import (
"context"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
)
func TestListPodResourcesV1(t *testing.T) {
podName := "pod-name"
podNamespace := "pod-namespace"
podUID := types.UID("pod-uid")
containerName := "container-name"
devs := []*v1alpha1.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0", "dev1"},
},
}
for _, tc := range []struct {
desc string
pods []*v1.Pod
devices []*v1alpha1.ContainerDevices
expectedResponse *podresourcesapi.ListPodResourcesResponse
}{
{
desc: "no pods",
pods: []*v1.Pod{},
devices: []*v1alpha1.ContainerDevices{},
expectedResponse: &podresourcesapi.ListPodResourcesResponse{},
},
{
desc: "pod without devices",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
UID: podUID,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: containerName,
},
},
},
},
},
devices: []*v1alpha1.ContainerDevices{},
expectedResponse: &podresourcesapi.ListPodResourcesResponse{
PodResources: []*podresourcesapi.PodResources{
{
Name: podName,
Namespace: podNamespace,
Containers: []*podresourcesapi.ContainerResources{
{
Name: containerName,
Devices: []*podresourcesapi.ContainerDevices{},
},
},
},
},
},
},
{
desc: "pod with devices",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
UID: podUID,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: containerName,
},
},
},
},
},
devices: devs,
expectedResponse: &podresourcesapi.ListPodResourcesResponse{
PodResources: []*podresourcesapi.PodResources{
{
Name: podName,
Namespace: podNamespace,
Containers: []*podresourcesapi.ContainerResources{
{
Name: containerName,
Devices: alphaDevicesToV1(devs),
},
},
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
m := new(mockProvider)
m.On("GetPods").Return(tc.pods)
m.On("GetDevices", string(podUID), containerName).Return(tc.devices)
m.On("UpdateAllocatedDevices").Return()
server := NewV1PodResourcesServer(m, m)
resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}
if tc.expectedResponse.String() != resp.String() {
t.Errorf("want resp = %s, got %s", tc.expectedResponse.String(), resp.String())
}
})
}
}

View File

@ -19,38 +19,26 @@ package podresources
import (
"context"
"k8s.io/api/core/v1"
"k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
)
// DevicesProvider knows how to provide the devices used by the given container
type DevicesProvider interface {
GetDevices(podUID, containerName string) []*v1alpha1.ContainerDevices
UpdateAllocatedDevices()
}
// PodsProvider knows how to provide the pods admitted by the node
type PodsProvider interface {
GetPods() []*v1.Pod
}
// podResourcesServer implements PodResourcesListerServer
type podResourcesServer struct {
// podResourcesServerV1alpha1 implements PodResourcesListerServer
type v1alpha1PodResourcesServer struct {
podsProvider PodsProvider
devicesProvider DevicesProvider
}
// NewPodResourcesServer returns a PodResourcesListerServer which lists pods provided by the PodsProvider
// NewV1alpha1PodResourcesServer returns a PodResourcesListerServer which lists pods provided by the PodsProvider
// with device information provided by the DevicesProvider
func NewPodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesProvider) v1alpha1.PodResourcesListerServer {
return &podResourcesServer{
func NewV1alpha1PodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesProvider) v1alpha1.PodResourcesListerServer {
return &v1alpha1PodResourcesServer{
podsProvider: podsProvider,
devicesProvider: devicesProvider,
}
}
// List returns information about the resources assigned to pods on the node
func (p *podResourcesServer) List(ctx context.Context, req *v1alpha1.ListPodResourcesRequest) (*v1alpha1.ListPodResourcesResponse, error) {
func (p *v1alpha1PodResourcesServer) List(ctx context.Context, req *v1alpha1.ListPodResourcesRequest) (*v1alpha1.ListPodResourcesResponse, error) {
pods := p.podsProvider.GetPods()
podResources := make([]*v1alpha1.PodResources, len(pods))
p.devicesProvider.UpdateAllocatedDevices()

View File

@ -46,7 +46,7 @@ func (m *mockProvider) UpdateAllocatedDevices() {
m.Called()
}
func TestListPodResources(t *testing.T) {
func TestListPodResourcesV1alpha1(t *testing.T) {
podName := "pod-name"
podNamespace := "pod-namespace"
podUID := types.UID("pod-uid")
@ -145,7 +145,7 @@ func TestListPodResources(t *testing.T) {
m.On("GetPods").Return(tc.pods)
m.On("GetDevices", string(podUID), containerName).Return(tc.devices)
m.On("UpdateAllocatedDevices").Return()
server := NewPodResourcesServer(m, m)
server := NewV1alpha1PodResourcesServer(m, m)
resp, err := server.List(context.TODO(), &v1alpha1.ListPodResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)

View File

@ -0,0 +1,33 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podresources
import (
"k8s.io/api/core/v1"
"k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
)
// DevicesProvider knows how to provide the devices used by the given container
type DevicesProvider interface {
GetDevices(podUID, containerName string) []*v1alpha1.ContainerDevices
UpdateAllocatedDevices()
}
// PodsProvider knows how to provide the pods admitted by the node
type PodsProvider interface {
GetPods() []*v1.Pod
}

View File

@ -50,6 +50,7 @@ go_library(
"//staging/src/k8s.io/component-base/logs:go_default_library",
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
"//staging/src/k8s.io/kubelet/pkg/apis/podresources/v1:go_default_library",
"//staging/src/k8s.io/kubelet/pkg/apis/podresources/v1alpha1:go_default_library",
"//vendor/github.com/emicklei/go-restful:go_default_library",
"//vendor/github.com/google/cadvisor/container:go_default_library",

View File

@ -60,7 +60,8 @@ import (
"k8s.io/component-base/logs"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
podresourcesapiv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/v1/validation"
@ -183,7 +184,8 @@ func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer st
// ListenAndServePodResources initializes a gRPC server to serve the PodResources service
func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider) {
server := grpc.NewServer()
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewPodResourcesServer(podsProvider, devicesProvider))
podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(podsProvider, devicesProvider))
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(podsProvider, devicesProvider))
l, err := util.CreateListener(socket)
if err != nil {
klog.Fatalf("Failed to create listener for podResources endpoint: %v", err)