Merge pull request #58282 from vikaschoudhary16/per-container-allocate

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Invoke preStart RPC call before container start, if desired by plugin

**What this PR does / why we need it**:
1. Adds a new RPC `preStart` to device plugin API
2. Update `Register` RPC handling to receive a flag from the Device plugins as an indicator if kubelet should invoke `preStart` RPC before starting container.
3. Changes in device manager to invoke `preStart` before container start
4. Test case updates


**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #56943 #56307 


**Special notes for your reviewer**:

**Release note**:

```release-note
None
```
/sig node

/area hw-accelerators
/cc @jiayingz @RenaudWasTaken @vishh @ScorpioCPH @sjenning @derekwaynecarr @jeremyeder @lichuqiang @tengqm
This commit is contained in:
Kubernetes Submit Queue 2018-02-21 13:07:26 -08:00 committed by GitHub
commit e8dd75f37d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 861 additions and 186 deletions

View File

@ -37,5 +37,4 @@ filegroup(
filegroup(
name = "go_default_library_protos",
srcs = ["api.proto"],
visibility = ["//visibility:public"],
)

View File

@ -25,10 +25,13 @@ limitations under the License.
api.proto
It has these top-level messages:
DevicePluginOptions
RegisterRequest
Empty
ListAndWatchResponse
Device
PreStartContainerRequest
PreStartContainerResponse
AllocateRequest
AllocateResponse
Mount
@ -63,6 +66,22 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type DevicePluginOptions struct {
// Indicates if PreStartContainer call is required before each container start
PreStartRequired bool `protobuf:"varint,1,opt,name=pre_start_required,json=preStartRequired,proto3" json:"pre_start_required,omitempty"`
}
func (m *DevicePluginOptions) Reset() { *m = DevicePluginOptions{} }
func (*DevicePluginOptions) ProtoMessage() {}
func (*DevicePluginOptions) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{0} }
func (m *DevicePluginOptions) GetPreStartRequired() bool {
if m != nil {
return m.PreStartRequired
}
return false
}
type RegisterRequest struct {
// Version of the API the Device Plugin was built against
Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"`
@ -71,11 +90,13 @@ type RegisterRequest struct {
Endpoint string `protobuf:"bytes,2,opt,name=endpoint,proto3" json:"endpoint,omitempty"`
// Schedulable resource name. As of now it's expected to be a DNS Label
ResourceName string `protobuf:"bytes,3,opt,name=resource_name,json=resourceName,proto3" json:"resource_name,omitempty"`
// Options to be communicated with Device Manager
Options *DevicePluginOptions `protobuf:"bytes,4,opt,name=options" json:"options,omitempty"`
}
func (m *RegisterRequest) Reset() { *m = RegisterRequest{} }
func (*RegisterRequest) ProtoMessage() {}
func (*RegisterRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{0} }
func (*RegisterRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{1} }
func (m *RegisterRequest) GetVersion() string {
if m != nil {
@ -98,12 +119,19 @@ func (m *RegisterRequest) GetResourceName() string {
return ""
}
func (m *RegisterRequest) GetOptions() *DevicePluginOptions {
if m != nil {
return m.Options
}
return nil
}
type Empty struct {
}
func (m *Empty) Reset() { *m = Empty{} }
func (*Empty) ProtoMessage() {}
func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{1} }
func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{2} }
// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disapears, ListAndWatch
@ -114,7 +142,7 @@ type ListAndWatchResponse struct {
func (m *ListAndWatchResponse) Reset() { *m = ListAndWatchResponse{} }
func (*ListAndWatchResponse) ProtoMessage() {}
func (*ListAndWatchResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{2} }
func (*ListAndWatchResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{3} }
func (m *ListAndWatchResponse) GetDevices() []*Device {
if m != nil {
@ -139,7 +167,7 @@ type Device struct {
func (m *Device) Reset() { *m = Device{} }
func (*Device) ProtoMessage() {}
func (*Device) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{3} }
func (*Device) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{4} }
func (m *Device) GetID() string {
if m != nil {
@ -155,6 +183,33 @@ func (m *Device) GetHealth() string {
return ""
}
// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase.
// - PreStartContainer allows kubelet to pass reinitialized devices to containers.
// - PreStartContainer allows Device Plugin to run device specific operations on
// the Devices requested
type PreStartContainerRequest struct {
DevicesIDs []string `protobuf:"bytes,1,rep,name=devicesIDs" json:"devicesIDs,omitempty"`
}
func (m *PreStartContainerRequest) Reset() { *m = PreStartContainerRequest{} }
func (*PreStartContainerRequest) ProtoMessage() {}
func (*PreStartContainerRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{5} }
func (m *PreStartContainerRequest) GetDevicesIDs() []string {
if m != nil {
return m.DevicesIDs
}
return nil
}
// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest
type PreStartContainerResponse struct {
}
func (m *PreStartContainerResponse) Reset() { *m = PreStartContainerResponse{} }
func (*PreStartContainerResponse) ProtoMessage() {}
func (*PreStartContainerResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{6} }
// - Allocate is expected to be called during pod creation since allocation
// failures for any container would result in pod startup failure.
// - Allocate allows kubelet to exposes additional artifacts in a pod's
@ -167,7 +222,7 @@ type AllocateRequest struct {
func (m *AllocateRequest) Reset() { *m = AllocateRequest{} }
func (*AllocateRequest) ProtoMessage() {}
func (*AllocateRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{4} }
func (*AllocateRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{7} }
func (m *AllocateRequest) GetDevicesIDs() []string {
if m != nil {
@ -197,7 +252,7 @@ type AllocateResponse struct {
func (m *AllocateResponse) Reset() { *m = AllocateResponse{} }
func (*AllocateResponse) ProtoMessage() {}
func (*AllocateResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{5} }
func (*AllocateResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{8} }
func (m *AllocateResponse) GetEnvs() map[string]string {
if m != nil {
@ -240,7 +295,7 @@ type Mount struct {
func (m *Mount) Reset() { *m = Mount{} }
func (*Mount) ProtoMessage() {}
func (*Mount) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{6} }
func (*Mount) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{9} }
func (m *Mount) GetContainerPath() string {
if m != nil {
@ -278,7 +333,7 @@ type DeviceSpec struct {
func (m *DeviceSpec) Reset() { *m = DeviceSpec{} }
func (*DeviceSpec) ProtoMessage() {}
func (*DeviceSpec) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{7} }
func (*DeviceSpec) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{10} }
func (m *DeviceSpec) GetContainerPath() string {
if m != nil {
@ -302,10 +357,13 @@ func (m *DeviceSpec) GetPermissions() string {
}
func init() {
proto.RegisterType((*DevicePluginOptions)(nil), "v1beta1.DevicePluginOptions")
proto.RegisterType((*RegisterRequest)(nil), "v1beta1.RegisterRequest")
proto.RegisterType((*Empty)(nil), "v1beta1.Empty")
proto.RegisterType((*ListAndWatchResponse)(nil), "v1beta1.ListAndWatchResponse")
proto.RegisterType((*Device)(nil), "v1beta1.Device")
proto.RegisterType((*PreStartContainerRequest)(nil), "v1beta1.PreStartContainerRequest")
proto.RegisterType((*PreStartContainerResponse)(nil), "v1beta1.PreStartContainerResponse")
proto.RegisterType((*AllocateRequest)(nil), "v1beta1.AllocateRequest")
proto.RegisterType((*AllocateResponse)(nil), "v1beta1.AllocateResponse")
proto.RegisterType((*Mount)(nil), "v1beta1.Mount")
@ -395,6 +453,10 @@ type DevicePluginClient interface {
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
Allocate(ctx context.Context, in *AllocateRequest, opts ...grpc.CallOption) (*AllocateResponse, error)
// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as reseting the device before making devices available to the container
PreStartContainer(ctx context.Context, in *PreStartContainerRequest, opts ...grpc.CallOption) (*PreStartContainerResponse, error)
}
type devicePluginClient struct {
@ -446,6 +508,15 @@ func (c *devicePluginClient) Allocate(ctx context.Context, in *AllocateRequest,
return out, nil
}
func (c *devicePluginClient) PreStartContainer(ctx context.Context, in *PreStartContainerRequest, opts ...grpc.CallOption) (*PreStartContainerResponse, error) {
out := new(PreStartContainerResponse)
err := grpc.Invoke(ctx, "/v1beta1.DevicePlugin/PreStartContainer", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for DevicePlugin service
type DevicePluginServer interface {
@ -457,6 +528,10 @@ type DevicePluginServer interface {
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
Allocate(context.Context, *AllocateRequest) (*AllocateResponse, error)
// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as reseting the device before making devices available to the container
PreStartContainer(context.Context, *PreStartContainerRequest) (*PreStartContainerResponse, error)
}
func RegisterDevicePluginServer(s *grpc.Server, srv DevicePluginServer) {
@ -502,6 +577,24 @@ func _DevicePlugin_Allocate_Handler(srv interface{}, ctx context.Context, dec fu
return interceptor(ctx, in, info, handler)
}
func _DevicePlugin_PreStartContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PreStartContainerRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DevicePluginServer).PreStartContainer(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/v1beta1.DevicePlugin/PreStartContainer",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DevicePluginServer).PreStartContainer(ctx, req.(*PreStartContainerRequest))
}
return interceptor(ctx, in, info, handler)
}
var _DevicePlugin_serviceDesc = grpc.ServiceDesc{
ServiceName: "v1beta1.DevicePlugin",
HandlerType: (*DevicePluginServer)(nil),
@ -510,6 +603,10 @@ var _DevicePlugin_serviceDesc = grpc.ServiceDesc{
MethodName: "Allocate",
Handler: _DevicePlugin_Allocate_Handler,
},
{
MethodName: "PreStartContainer",
Handler: _DevicePlugin_PreStartContainer_Handler,
},
},
Streams: []grpc.StreamDesc{
{
@ -521,6 +618,34 @@ var _DevicePlugin_serviceDesc = grpc.ServiceDesc{
Metadata: "api.proto",
}
func (m *DevicePluginOptions) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *DevicePluginOptions) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.PreStartRequired {
dAtA[i] = 0x8
i++
if m.PreStartRequired {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i++
}
return i, nil
}
func (m *RegisterRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -554,6 +679,16 @@ func (m *RegisterRequest) MarshalTo(dAtA []byte) (int, error) {
i = encodeVarintApi(dAtA, i, uint64(len(m.ResourceName)))
i += copy(dAtA[i:], m.ResourceName)
}
if m.Options != nil {
dAtA[i] = 0x22
i++
i = encodeVarintApi(dAtA, i, uint64(m.Options.Size()))
n1, err := m.Options.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n1
}
return i, nil
}
@ -635,6 +770,57 @@ func (m *Device) MarshalTo(dAtA []byte) (int, error) {
return i, nil
}
func (m *PreStartContainerRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PreStartContainerRequest) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.DevicesIDs) > 0 {
for _, s := range m.DevicesIDs {
dAtA[i] = 0xa
i++
l = len(s)
for l >= 1<<7 {
dAtA[i] = uint8(uint64(l)&0x7f | 0x80)
l >>= 7
i++
}
dAtA[i] = uint8(l)
i++
i += copy(dAtA[i:], s)
}
}
return i, nil
}
func (m *PreStartContainerResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PreStartContainerResponse) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
return i, nil
}
func (m *AllocateRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -847,6 +1033,15 @@ func encodeVarintApi(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
return offset + 1
}
func (m *DevicePluginOptions) Size() (n int) {
var l int
_ = l
if m.PreStartRequired {
n += 2
}
return n
}
func (m *RegisterRequest) Size() (n int) {
var l int
_ = l
@ -862,6 +1057,10 @@ func (m *RegisterRequest) Size() (n int) {
if l > 0 {
n += 1 + l + sovApi(uint64(l))
}
if m.Options != nil {
l = m.Options.Size()
n += 1 + l + sovApi(uint64(l))
}
return n
}
@ -897,6 +1096,24 @@ func (m *Device) Size() (n int) {
return n
}
func (m *PreStartContainerRequest) Size() (n int) {
var l int
_ = l
if len(m.DevicesIDs) > 0 {
for _, s := range m.DevicesIDs {
l = len(s)
n += 1 + l + sovApi(uint64(l))
}
}
return n
}
func (m *PreStartContainerResponse) Size() (n int) {
var l int
_ = l
return n
}
func (m *AllocateRequest) Size() (n int) {
var l int
_ = l
@ -991,6 +1208,16 @@ func sovApi(x uint64) (n int) {
func sozApi(x uint64) (n int) {
return sovApi(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (this *DevicePluginOptions) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&DevicePluginOptions{`,
`PreStartRequired:` + fmt.Sprintf("%v", this.PreStartRequired) + `,`,
`}`,
}, "")
return s
}
func (this *RegisterRequest) String() string {
if this == nil {
return "nil"
@ -999,6 +1226,7 @@ func (this *RegisterRequest) String() string {
`Version:` + fmt.Sprintf("%v", this.Version) + `,`,
`Endpoint:` + fmt.Sprintf("%v", this.Endpoint) + `,`,
`ResourceName:` + fmt.Sprintf("%v", this.ResourceName) + `,`,
`Options:` + strings.Replace(fmt.Sprintf("%v", this.Options), "DevicePluginOptions", "DevicePluginOptions", 1) + `,`,
`}`,
}, "")
return s
@ -1033,6 +1261,25 @@ func (this *Device) String() string {
}, "")
return s
}
func (this *PreStartContainerRequest) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&PreStartContainerRequest{`,
`DevicesIDs:` + fmt.Sprintf("%v", this.DevicesIDs) + `,`,
`}`,
}, "")
return s
}
func (this *PreStartContainerResponse) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&PreStartContainerResponse{`,
`}`,
}, "")
return s
}
func (this *AllocateRequest) String() string {
if this == nil {
return "nil"
@ -1108,6 +1355,76 @@ func valueToStringApi(v interface{}) string {
pv := reflect.Indirect(rv).Interface()
return fmt.Sprintf("*%v", pv)
}
func (m *DevicePluginOptions) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: DevicePluginOptions: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: DevicePluginOptions: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field PreStartRequired", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.PreStartRequired = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *RegisterRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
@ -1224,6 +1541,39 @@ func (m *RegisterRequest) Unmarshal(dAtA []byte) error {
}
m.ResourceName = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Options", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthApi
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Options == nil {
m.Options = &DevicePluginOptions{}
}
if err := m.Options.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
@ -1484,6 +1834,135 @@ func (m *Device) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *PreStartContainerRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PreStartContainerRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PreStartContainerRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field DevicesIDs", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthApi
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.DevicesIDs = append(m.DevicesIDs, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *PreStartContainerResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PreStartContainerResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PreStartContainerResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *AllocateRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
@ -2280,43 +2759,48 @@ var (
func init() { proto.RegisterFile("api.proto", fileDescriptorApi) }
var fileDescriptorApi = []byte{
// 594 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x4b, 0x6f, 0xd3, 0x4c,
0x14, 0x8d, 0x93, 0x36, 0x8f, 0xdb, 0xf4, 0xa1, 0xf9, 0xaa, 0x4f, 0xc6, 0x80, 0x55, 0xb9, 0x02,
0x15, 0xa4, 0xa6, 0x0f, 0x24, 0x8a, 0x58, 0x20, 0x19, 0xa5, 0x48, 0x95, 0x0a, 0x54, 0x66, 0xc1,
0x32, 0x9a, 0x38, 0x97, 0x78, 0x84, 0x3d, 0x63, 0x3c, 0x63, 0x4b, 0xd9, 0xf1, 0x13, 0x58, 0xf0,
0xa3, 0xba, 0x64, 0xc9, 0x92, 0x86, 0x15, 0xff, 0x02, 0x65, 0xfc, 0x48, 0x64, 0xd1, 0x05, 0x12,
0x3b, 0xdf, 0x73, 0xef, 0xb1, 0xcf, 0x9c, 0xb9, 0xc7, 0xd0, 0xa3, 0x31, 0x1b, 0xc4, 0x89, 0x50,
0x82, 0x74, 0xb2, 0x93, 0x31, 0x2a, 0x7a, 0x62, 0x1d, 0x4e, 0x99, 0x0a, 0xd2, 0xf1, 0xc0, 0x17,
0xd1, 0xd1, 0x54, 0x4c, 0xc5, 0x91, 0xee, 0x8f, 0xd3, 0x0f, 0xba, 0xd2, 0x85, 0x7e, 0xca, 0x79,
0x4e, 0x08, 0xdb, 0x1e, 0x4e, 0x99, 0x54, 0x98, 0x78, 0xf8, 0x29, 0x45, 0xa9, 0x88, 0x09, 0x9d,
0x0c, 0x13, 0xc9, 0x04, 0x37, 0x8d, 0x3d, 0xe3, 0xa0, 0xe7, 0x95, 0x25, 0xb1, 0xa0, 0x8b, 0x7c,
0x12, 0x0b, 0xc6, 0x95, 0xd9, 0xd4, 0xad, 0xaa, 0x26, 0xfb, 0xb0, 0x99, 0xa0, 0x14, 0x69, 0xe2,
0xe3, 0x88, 0xd3, 0x08, 0xcd, 0x96, 0x1e, 0xe8, 0x97, 0xe0, 0x1b, 0x1a, 0xa1, 0xd3, 0x81, 0xf5,
0xf3, 0x28, 0x56, 0x33, 0xc7, 0x85, 0xdd, 0x4b, 0x26, 0x95, 0xcb, 0x27, 0xef, 0xa9, 0xf2, 0x03,
0x0f, 0x65, 0x2c, 0xb8, 0x44, 0xf2, 0x08, 0x3a, 0x13, 0xcc, 0x98, 0x8f, 0xd2, 0x34, 0xf6, 0x5a,
0x07, 0x1b, 0xa7, 0xdb, 0x83, 0xe2, 0x60, 0x83, 0xa1, 0xc6, 0xbd, 0xb2, 0xef, 0x1c, 0x43, 0x3b,
0x87, 0xc8, 0x16, 0x34, 0x2f, 0x86, 0x85, 0xd6, 0x26, 0x1b, 0x92, 0xff, 0xa1, 0x1d, 0x20, 0x0d,
0x55, 0x50, 0x88, 0x2c, 0x2a, 0xe7, 0x04, 0xb6, 0xdd, 0x30, 0x14, 0x3e, 0x55, 0x58, 0x9e, 0xd5,
0x06, 0x28, 0xde, 0x77, 0x31, 0xcc, 0x3f, 0xd9, 0xf3, 0x56, 0x10, 0xe7, 0x57, 0x13, 0x76, 0x96,
0x9c, 0x42, 0xe4, 0x19, 0xac, 0x21, 0xcf, 0x4a, 0x85, 0xfb, 0x95, 0xc2, 0xfa, 0xe0, 0xe0, 0x9c,
0x67, 0xf2, 0x9c, 0xab, 0x64, 0xe6, 0x69, 0x02, 0x79, 0x08, 0xed, 0x48, 0xa4, 0x5c, 0x49, 0xb3,
0xa9, 0xa9, 0x5b, 0x15, 0xf5, 0xf5, 0x02, 0xf6, 0x8a, 0x2e, 0x39, 0x5c, 0xba, 0xd0, 0xd2, 0x83,
0xff, 0xd5, 0x5c, 0x78, 0x17, 0xa3, 0x5f, 0x39, 0x41, 0x2e, 0x61, 0x83, 0x72, 0x2e, 0x14, 0x55,
0x4c, 0x70, 0x69, 0xae, 0x69, 0xca, 0xe3, 0xdb, 0x65, 0xb9, 0xcb, 0xe1, 0x5c, 0xdd, 0x2a, 0xdd,
0x3a, 0x83, 0x5e, 0xa5, 0x9b, 0xec, 0x40, 0xeb, 0x23, 0xce, 0x0a, 0x6f, 0x17, 0x8f, 0x64, 0x17,
0xd6, 0x33, 0x1a, 0xa6, 0x58, 0x78, 0x9b, 0x17, 0xcf, 0x9b, 0xcf, 0x0c, 0xeb, 0x05, 0xec, 0xd4,
0xdf, 0xfc, 0x37, 0x7c, 0x27, 0x80, 0x75, 0x6d, 0x03, 0x79, 0x00, 0x5b, 0xbe, 0xe0, 0x8a, 0x32,
0x8e, 0xc9, 0x28, 0xa6, 0x2a, 0x28, 0xf8, 0x9b, 0x15, 0x7a, 0x45, 0x55, 0x40, 0xee, 0x42, 0x2f,
0x10, 0x52, 0xe5, 0x13, 0xc5, 0x3a, 0x2e, 0x80, 0xb2, 0x99, 0x20, 0x9d, 0x8c, 0x04, 0x0f, 0x67,
0x7a, 0x15, 0xbb, 0x5e, 0x77, 0x01, 0xbc, 0xe5, 0xe1, 0xcc, 0x49, 0x00, 0x96, 0x3e, 0xfe, 0x93,
0xcf, 0xed, 0xc1, 0x46, 0x8c, 0x49, 0xc4, 0xa4, 0xd4, 0x57, 0x90, 0xef, 0xfe, 0x2a, 0x74, 0xfa,
0x0a, 0xfa, 0x79, 0xd0, 0x12, 0xed, 0x0f, 0x79, 0x0a, 0xdd, 0x32, 0x78, 0xc4, 0xac, 0xee, 0xaa,
0x96, 0x45, 0x6b, 0xb9, 0x21, 0x79, 0x6e, 0x1a, 0xa7, 0x5f, 0x0d, 0xe8, 0xe7, 0xe2, 0xaf, 0xc2,
0x74, 0xca, 0x38, 0x71, 0xa1, 0xbf, 0x1a, 0x25, 0x52, 0xa3, 0x58, 0xf7, 0xab, 0xfa, 0x4f, 0x89,
0x73, 0x1a, 0xc7, 0x06, 0x71, 0xa1, 0x5b, 0x2e, 0xc9, 0x8a, 0x96, 0x5a, 0x56, 0xac, 0x3b, 0xb7,
0x6e, 0x94, 0xd3, 0x78, 0x79, 0xef, 0xfa, 0xc6, 0x36, 0xbe, 0xdf, 0xd8, 0x8d, 0xcf, 0x73, 0xdb,
0xb8, 0x9e, 0xdb, 0xc6, 0xb7, 0xb9, 0x6d, 0xfc, 0x98, 0xdb, 0xc6, 0x97, 0x9f, 0x76, 0x63, 0xdc,
0xd6, 0x3f, 0x9b, 0x27, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x65, 0x2a, 0x0d, 0x36, 0xb1, 0x04,
0x00, 0x00,
// 688 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x4f, 0x4f, 0x13, 0x41,
0x14, 0xef, 0xb6, 0xd0, 0x3f, 0xaf, 0x05, 0xea, 0x40, 0xcc, 0xb2, 0xe0, 0xa6, 0x2e, 0xd1, 0xa0,
0x91, 0x02, 0x35, 0x01, 0xc3, 0xc1, 0xa4, 0x52, 0x4c, 0x48, 0x50, 0xc8, 0x72, 0xf0, 0x62, 0xd2,
0x4c, 0xdb, 0xb1, 0xbb, 0x71, 0x3b, 0xb3, 0xce, 0xcc, 0x36, 0xe9, 0xcd, 0x8f, 0xe0, 0x87, 0xf0,
0xc3, 0x70, 0xf4, 0xe8, 0x51, 0xea, 0xc9, 0xab, 0x9f, 0xc0, 0xec, 0xec, 0x9f, 0x36, 0x2b, 0x18,
0x4d, 0xbc, 0xcd, 0xfb, 0xbd, 0xf7, 0x7b, 0xf3, 0xfe, 0xcc, 0x6f, 0xa0, 0x82, 0x7d, 0xb7, 0xe9,
0x73, 0x26, 0x19, 0x2a, 0x8d, 0xf7, 0x7b, 0x44, 0xe2, 0x7d, 0x63, 0x67, 0xe8, 0x4a, 0x27, 0xe8,
0x35, 0xfb, 0x6c, 0xb4, 0x3b, 0x64, 0x43, 0xb6, 0xab, 0xfc, 0xbd, 0xe0, 0x9d, 0xb2, 0x94, 0xa1,
0x4e, 0x11, 0xcf, 0x3a, 0x86, 0xd5, 0x0e, 0x19, 0xbb, 0x7d, 0x72, 0xe1, 0x05, 0x43, 0x97, 0x9e,
0xfb, 0xd2, 0x65, 0x54, 0xa0, 0x27, 0x80, 0x7c, 0x4e, 0xba, 0x42, 0x62, 0x2e, 0xbb, 0x9c, 0x7c,
0x08, 0x5c, 0x4e, 0x06, 0xba, 0xd6, 0xd0, 0xb6, 0xcb, 0x76, 0xdd, 0xe7, 0xe4, 0x32, 0x74, 0xd8,
0x31, 0x6e, 0x7d, 0xd6, 0x60, 0xc5, 0x26, 0x43, 0x57, 0x48, 0xc2, 0x43, 0x90, 0x08, 0x89, 0x74,
0x28, 0x8d, 0x09, 0x17, 0x2e, 0xa3, 0x8a, 0x56, 0xb1, 0x13, 0x13, 0x19, 0x50, 0x26, 0x74, 0xe0,
0x33, 0x97, 0x4a, 0x3d, 0xaf, 0x5c, 0xa9, 0x8d, 0xb6, 0x60, 0x89, 0x13, 0xc1, 0x02, 0xde, 0x27,
0x5d, 0x8a, 0x47, 0x44, 0x2f, 0xa8, 0x80, 0x5a, 0x02, 0xbe, 0xc6, 0x23, 0x82, 0x0e, 0xa0, 0xc4,
0xa2, 0x3a, 0xf5, 0x85, 0x86, 0xb6, 0x5d, 0x6d, 0x6d, 0x36, 0xe3, 0xee, 0x9b, 0x37, 0xf4, 0x62,
0x27, 0xc1, 0x56, 0x09, 0x16, 0x4f, 0x46, 0xbe, 0x9c, 0x58, 0x6d, 0x58, 0x3b, 0x73, 0x85, 0x6c,
0xd3, 0xc1, 0x1b, 0x2c, 0xfb, 0x8e, 0x4d, 0x84, 0xcf, 0xa8, 0x20, 0xe8, 0x11, 0x94, 0x06, 0x2a,
0x81, 0xd0, 0xb5, 0x46, 0x61, 0xbb, 0xda, 0x5a, 0xc9, 0x24, 0xb6, 0x13, 0xbf, 0xb5, 0x07, 0xc5,
0x08, 0x42, 0xcb, 0x90, 0x3f, 0xed, 0xc4, 0x3d, 0xe6, 0xdd, 0x0e, 0xba, 0x0b, 0x45, 0x87, 0x60,
0x4f, 0x3a, 0x71, 0x73, 0xb1, 0x65, 0x1d, 0x81, 0x7e, 0x11, 0x0f, 0xee, 0x98, 0x51, 0x89, 0x5d,
0x3a, 0x1b, 0x96, 0x09, 0x10, 0x27, 0x3e, 0xed, 0x44, 0x77, 0x57, 0xec, 0x39, 0xc4, 0xda, 0x80,
0xf5, 0x1b, 0xb8, 0x51, 0xd5, 0xd6, 0x3e, 0xac, 0xb4, 0x3d, 0x8f, 0xf5, 0xb1, 0x24, 0x7f, 0x9b,
0xef, 0x47, 0x1e, 0xea, 0x33, 0x4e, 0xdc, 0xfd, 0x21, 0x2c, 0x10, 0x3a, 0x4e, 0x5a, 0xdf, 0x4a,
0x5b, 0xcf, 0x06, 0x36, 0x4f, 0xe8, 0x58, 0x9c, 0x50, 0xc9, 0x27, 0xb6, 0x22, 0xa0, 0x87, 0x50,
0x1c, 0xb1, 0x80, 0x4a, 0xa1, 0xe7, 0x15, 0x75, 0x39, 0xa5, 0xbe, 0x0a, 0x61, 0x3b, 0xf6, 0xa2,
0x9d, 0xd9, 0x78, 0x0b, 0x2a, 0x70, 0x35, 0x33, 0xde, 0x4b, 0x9f, 0xf4, 0xd3, 0x11, 0xa3, 0x33,
0xa8, 0x62, 0x4a, 0x99, 0xc4, 0xc9, 0xaa, 0x43, 0xca, 0xe3, 0xdb, 0xcb, 0x6a, 0xcf, 0x82, 0xa3,
0xea, 0xe6, 0xe9, 0xc6, 0x21, 0x54, 0xd2, 0xba, 0x51, 0x1d, 0x0a, 0xef, 0xc9, 0x24, 0x5e, 0x5a,
0x78, 0x44, 0x6b, 0xb0, 0x38, 0xc6, 0x5e, 0x40, 0xe2, 0xa5, 0x45, 0xc6, 0x51, 0xfe, 0x99, 0x66,
0x3c, 0x87, 0x7a, 0x36, 0xf3, 0xbf, 0xf0, 0x2d, 0x07, 0x16, 0xd5, 0x18, 0xd0, 0x03, 0x58, 0xee,
0x27, 0xcb, 0xeb, 0xfa, 0x58, 0x3a, 0x31, 0x7f, 0x29, 0x45, 0x2f, 0xb0, 0x74, 0xd0, 0x06, 0x54,
0x1c, 0x26, 0x64, 0x14, 0x11, 0xeb, 0x23, 0x04, 0x12, 0x27, 0x27, 0x78, 0xd0, 0x65, 0xd4, 0x9b,
0x28, 0x6d, 0x94, 0xed, 0x72, 0x08, 0x9c, 0x53, 0x6f, 0x62, 0x71, 0x80, 0xd9, 0x1c, 0xff, 0xcb,
0x75, 0x0d, 0xa8, 0xfa, 0x84, 0x8f, 0x5c, 0x21, 0xd4, 0x0a, 0x22, 0x31, 0xce, 0x43, 0xad, 0x97,
0x50, 0x8b, 0x94, 0xcf, 0xd5, 0x7c, 0xd0, 0x01, 0x94, 0x93, 0x9f, 0x00, 0xe9, 0xe9, 0xae, 0x32,
0x9f, 0x83, 0x31, 0x7b, 0x21, 0x91, 0x20, 0x73, 0xad, 0x9f, 0x1a, 0xd4, 0xe6, 0xc5, 0x8b, 0xda,
0x50, 0x9b, 0xd7, 0x28, 0xca, 0x50, 0x8c, 0x7b, 0xa9, 0x7d, 0x93, 0x94, 0xad, 0xdc, 0x9e, 0x86,
0xda, 0x50, 0x4e, 0x1e, 0xc9, 0x5c, 0x2d, 0x19, 0xad, 0x18, 0xeb, 0xb7, 0xbe, 0x28, 0x2b, 0x87,
0xde, 0xc2, 0x9d, 0xdf, 0x84, 0x87, 0xee, 0xa7, 0x8c, 0xdb, 0x04, 0x6d, 0x58, 0x7f, 0x0a, 0x49,
0xb2, 0xbf, 0xd8, 0xbc, 0xba, 0x36, 0xb5, 0xaf, 0xd7, 0x66, 0xee, 0xe3, 0xd4, 0xd4, 0xae, 0xa6,
0xa6, 0xf6, 0x65, 0x6a, 0x6a, 0xdf, 0xa6, 0xa6, 0xf6, 0xe9, 0xbb, 0x99, 0xeb, 0x15, 0xd5, 0x0f,
0xfd, 0xf4, 0x57, 0x00, 0x00, 0x00, 0xff, 0xff, 0xad, 0x28, 0x13, 0x30, 0xe6, 0x05, 0x00, 0x00,
}

View File

@ -24,6 +24,11 @@ service Registration {
rpc Register(RegisterRequest) returns (Empty) {}
}
message DevicePluginOptions {
// Indicates if PreStartContainer call is required before each container start
bool pre_start_required = 1;
}
message RegisterRequest {
// Version of the API the Device Plugin was built against
string version = 1;
@ -32,6 +37,8 @@ message RegisterRequest {
string endpoint = 2;
// Schedulable resource name. As of now it's expected to be a DNS Label
string resource_name = 3;
// Options to be communicated with Device Manager
DevicePluginOptions options = 4;
}
message Empty {
@ -48,6 +55,11 @@ service DevicePlugin {
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as reseting the device before making devices available to the container
rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
}
// ListAndWatch returns a stream of List of Devices
@ -71,6 +83,18 @@ message Device {
string health = 2;
}
// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase.
// - PreStartContainer allows kubelet to pass reinitialized devices to containers.
// - PreStartContainer allows Device Plugin to run device specific operations on
// the Devices requested
message PreStartContainerRequest {
repeated string devicesIDs = 1;
}
// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest
message PreStartContainerResponse {
}
// - Allocate is expected to be called during pod creation since allocation
// failures for any container would result in pod startup failure.
// - Allocate allows kubelet to exposes additional artifacts in a pod's

View File

@ -30,4 +30,8 @@ const (
DevicePluginPath = "/var/lib/kubelet/device-plugins/"
// KubeletSocket is the path of the Kubelet registry socket
KubeletSocket = DevicePluginPath + "kubelet.sock"
// Timeout duration in secs for PreStartContainer RPC
KubeletPreStartContainerRPCTimeoutInSecs = 30
)
var SupportedVersions = [...]string{"v1beta1"}

View File

@ -601,8 +601,10 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe
opts := &kubecontainer.RunContainerOptions{}
// Allocate should already be called during predicateAdmitHandler.Admit(),
// just try to fetch device runtime information from cached state here
devOpts := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
if devOpts == nil {
devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
if err != nil {
return nil, err
} else if devOpts == nil {
return opts, nil
}
opts.Devices = append(opts.Devices, devOpts.Devices...)

View File

@ -14,7 +14,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
@ -39,7 +39,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/util/store:go_default_library",
"//pkg/scheduler/schedulercache:go_default_library",

View File

@ -26,7 +26,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
)
// Stub implementation for DevicePlugin.
@ -105,7 +105,7 @@ func (m *Stub) Stop() error {
}
// Register registers the device plugin for the given resourceName with Kubelet.
func (m *Stub) Register(kubeletEndpoint, resourceName string) error {
func (m *Stub) Register(kubeletEndpoint, resourceName string, preStartContainerFlag bool) error {
conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithTimeout(10*time.Second),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
@ -120,6 +120,7 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string) error {
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
Options: &pluginapi.DevicePluginOptions{PreStartRequired: preStartContainerFlag},
}
_, err = client.Register(context.Background(), reqt)
@ -129,6 +130,12 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string) error {
return nil
}
// PreStartContainer resets the devices received
func (m *Stub) PreStartContainer(ctx context.Context, r *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
log.Printf("PreStartContainer, %+v", r)
return &pluginapi.PreStartContainerResponse{}, nil
}
// ListAndWatch lists devices and update that list according to the Update call
func (m *Stub) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
log.Println("ListAndWatch")

View File

@ -26,7 +26,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
)
// endpoint maps to a single registered device plugin. It is responsible
@ -36,6 +36,7 @@ type endpoint interface {
run()
stop()
allocate(devs []string) (*pluginapi.AllocateResponse, error)
preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error)
getDevices() []pluginapi.Device
callback(resourceName string, added, updated, deleted []pluginapi.Device)
}
@ -182,6 +183,15 @@ func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, err
})
}
// preStartContainer issues PreStartContainer gRPC call to the device plugin.
func (e *endpointImpl) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), pluginapi.KubeletPreStartContainerRPCTimeoutInSecs*time.Second)
defer cancel()
return e.client.PreStartContainer(ctx, &pluginapi.PreStartContainerRequest{
DevicesIDs: devs,
})
}
func (e *endpointImpl) stop() {
e.clientConn.Close()
}

View File

@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/require"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
)
var (

View File

@ -33,7 +33,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
@ -85,6 +85,7 @@ type ManagerImpl struct {
// podDevices contains pod to allocated device mapping.
podDevices podDevices
store utilstore.Store
pluginOpts map[string]*pluginapi.DevicePluginOptions
}
type sourcesReadyStub struct{}
@ -112,6 +113,7 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) {
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
pluginOpts: make(map[string]*pluginapi.DevicePluginOptions),
podDevices: make(podDevices),
}
manager.callback = manager.genericDeviceUpdateCallback
@ -201,6 +203,7 @@ func (m *ManagerImpl) checkpointFile() string {
// starts device plugin registration service.
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
glog.V(2).Infof("Starting Device Plugin manager")
fmt.Println("Starting Device Plugin manager")
m.activePods = activePods
m.sourcesReady = sourcesReady
@ -286,8 +289,15 @@ func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.P
func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
glog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)
metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
if r.Version != pluginapi.Version {
errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.Version)
var versionCompatible bool
for _, v := range pluginapi.SupportedVersions {
if r.Version == v {
versionCompatible = true
break
}
}
if !versionCompatible {
errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
glog.Infof("Bad registration request from device plugin with resource name %q: %v", r.ResourceName, errorString)
return &pluginapi.Empty{}, fmt.Errorf(errorString)
}
@ -340,8 +350,10 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
return
}
m.mutex.Lock()
if r.Options != nil {
m.pluginOpts[r.ResourceName] = r.Options
}
// Check for potential re-registration during the initialization of new endpoint,
// and skip updating if re-registration happens.
// TODO: simplify the part once we have a better way to handle registered devices
@ -590,11 +602,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
resource := string(k)
needed := int(v.Value())
glog.V(3).Infof("needs %d %s", needed, resource)
_, registeredResource := m.healthyDevices[resource]
_, allocatedResource := m.allocatedDevices[resource]
// Continues if this is neither an active device plugin resource nor
// a resource we have previously allocated.
if !registeredResource && !allocatedResource {
if !m.isDevicePluginResource(resource) {
continue
}
// Updates allocatedDevices to garbage collect any stranded resources
@ -610,6 +618,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
if allocDevices == nil || len(allocDevices) <= 0 {
continue
}
startRPCTime := time.Now()
// Manager.Allocate involves RPC calls to device plugin, which
// could be heavy-weight. Therefore we want to perform this operation outside
@ -659,10 +668,60 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
// for the found one. An empty struct is returned in case no cached state is found.
func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
podUID := string(pod.UID)
contName := container.Name
for k := range container.Resources.Limits {
resource := string(k)
if !m.isDevicePluginResource(resource) {
continue
}
err := m.callPreStartContainerIfNeeded(podUID, contName, resource)
if err != nil {
return nil, err
}
}
m.mutex.Lock()
defer m.mutex.Unlock()
return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name)
return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil
}
func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error {
m.mutex.Lock()
opts, ok := m.pluginOpts[resource]
if !ok {
m.mutex.Unlock()
glog.V(4).Infof("Plugin options not found in cache for resource: %s. Skip PreStartContainer", resource)
return nil
}
if !opts.PreStartRequired {
m.mutex.Unlock()
glog.V(4).Infof("Plugin options indicate to skip PreStartContainer for resource, %v", resource)
return nil
}
devices := m.podDevices.containerDevices(podUID, contName, resource)
if devices == nil {
m.mutex.Unlock()
return fmt.Errorf("no devices found allocated in local cache for pod %s, container %s, resource %s", podUID, contName, resource)
}
e, ok := m.endpoints[resource]
if !ok {
m.mutex.Unlock()
return fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource)
}
m.mutex.Unlock()
devs := devices.UnsortedList()
glog.V(4).Infof("Issuing an PreStartContainer call for container, %s, of pod %s", contName, podUID)
_, err := e.preStartContainer(devs)
if err != nil {
return fmt.Errorf("device plugin PreStartContainer rpc failed with err: %v", err)
}
// TODO: Add metrics support for init RPC
return nil
}
// sanitizeNodeAllocatable scans through allocatedDevices in the device manager
@ -692,3 +751,14 @@ func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulercache.NodeInfo) {
node.SetAllocatableResource(newAllocatableResource)
}
}
func (m *ManagerImpl) isDevicePluginResource(resource string) bool {
_, registeredResource := m.healthyDevices[resource]
_, allocatedResource := m.allocatedDevices[resource]
// Return true if this is either an active device plugin resource or
// a resource we have previously allocated.
if registeredResource || allocatedResource {
return true
}
return false
}

View File

@ -18,7 +18,7 @@ package devicemanager
import (
"k8s.io/api/core/v1"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
@ -53,8 +53,8 @@ func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.P
}
// GetDeviceRunContainerOptions simply returns nil.
func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
return nil
func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
return nil, nil
}
// GetCapacity simply returns nil capacity and empty removed resource list.

View File

@ -33,7 +33,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
utilstore "k8s.io/kubernetes/pkg/kubelet/util/store"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
@ -60,6 +60,7 @@ func TestNewManagerImpl(t *testing.T) {
defer os.RemoveAll(socketDir)
_, err = newManagerImpl(socketName)
require.NoError(t, err)
os.RemoveAll(socketDir)
}
func TestNewManagerImplStart(t *testing.T) {
@ -84,67 +85,68 @@ func TestDevicePluginReRegistration(t *testing.T) {
devsForRegistration := []*pluginapi.Device{
{ID: "Dev3", Health: pluginapi.Healthy},
}
for _, preStartContainerFlag := range []bool{false, true} {
expCallbackCount := int32(0)
callbackCount := int32(0)
callbackChan := make(chan int32)
callback := func(n string, a, u, r []pluginapi.Device) {
callbackCount++
if callbackCount > atomic.LoadInt32(&expCallbackCount) {
expCallbackCount := int32(0)
callbackCount := int32(0)
callbackChan := make(chan int32)
callback := func(n string, a, u, r []pluginapi.Device) {
callbackCount++
if callbackCount > atomic.LoadInt32(&expCallbackCount) {
t.FailNow()
}
callbackChan <- callbackCount
}
m, p1 := setup(t, devs, callback, socketName, pluginSocketName)
atomic.StoreInt32(&expCallbackCount, 1)
p1.Register(socketName, testResourceName, preStartContainerFlag)
// Wait for the first callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
callbackChan <- callbackCount
devices := m.Devices()
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.")
p2 := NewDevicePluginStub(devs, pluginSocketName+".new")
err = p2.Start()
require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 2)
p2.Register(socketName, testResourceName, preStartContainerFlag)
// Wait for the second callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices2 := m.Devices()
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
// Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third")
err = p3.Start()
require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 3)
p3.Register(socketName, testResourceName, preStartContainerFlag)
// Wait for the second callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices3 := m.Devices()
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.")
p2.Stop()
p3.Stop()
cleanup(t, m, p1)
close(callbackChan)
}
m, p1 := setup(t, devs, callback, socketName, pluginSocketName)
atomic.StoreInt32(&expCallbackCount, 1)
p1.Register(socketName, testResourceName)
// Wait for the first callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices := m.Devices()
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.")
p2 := NewDevicePluginStub(devs, pluginSocketName+".new")
err = p2.Start()
require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 2)
p2.Register(socketName, testResourceName)
// Wait for the second callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices2 := m.Devices()
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
// Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third")
err = p3.Start()
require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 3)
p3.Register(socketName, testResourceName)
// Wait for the second callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices3 := m.Devices()
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.")
p2.Stop()
p3.Stop()
cleanup(t, m, p1)
close(callbackChan)
}
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, *Stub) {
@ -359,9 +361,9 @@ func TestCheckpoint(t *testing.T) {
for podUID, containerDevices := range expectedPodDevices {
for conName, resources := range containerDevices {
for resource := range resources {
as.True(reflect.DeepEqual(
expectedPodDevices.containerDevices(podUID, conName, resource),
testManager.podDevices.containerDevices(podUID, conName, resource)))
expDevices := expectedPodDevices.containerDevices(podUID, conName, resource)
testDevices := testManager.podDevices.containerDevices(podUID, conName, resource)
as.True(reflect.DeepEqual(expDevices, testDevices))
opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName)
opts2 := testManager.podDevices.deviceRunContainerOptions(podUID, conName)
as.Equal(len(opts1.Envs), len(opts2.Envs))
@ -388,6 +390,7 @@ func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) {
type MockEndpoint struct {
allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error)
initChan chan []string
}
func (m *MockEndpoint) stop() {}
@ -399,6 +402,11 @@ func (m *MockEndpoint) getDevices() []pluginapi.Device {
func (m *MockEndpoint) callback(resourceName string, added, updated, deleted []pluginapi.Device) {}
func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) {
m.initChan <- devs
return &pluginapi.PreStartContainerResponse{}, nil
}
func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
if m.allocateFunc != nil {
return m.allocateFunc(devs)
@ -423,7 +431,7 @@ func makePod(limits v1.ResourceList) *v1.Pod {
}
}
func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) *ManagerImpl {
func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) *ManagerImpl {
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
testManager := &ManagerImpl{
socketdir: tmpDir,
@ -431,6 +439,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpoint),
pluginOpts: opts,
podDevices: make(podDevices),
activePods: activePods,
sourcesReady: &sourcesReadyStub{},
@ -443,48 +452,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
}
if res.resourceName == "domain1.com/resource1" {
testManager.endpoints[res.resourceName] = &MockEndpoint{
allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) {
resp := new(pluginapi.AllocateResponse)
resp.Envs = make(map[string]string)
for _, dev := range devs {
switch dev {
case "dev1":
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
ContainerPath: "/dev/aaa",
HostPath: "/dev/aaa",
Permissions: "mrw",
})
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
ContainerPath: "/dev/bbb",
HostPath: "/dev/bbb",
Permissions: "mrw",
})
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
ContainerPath: "/container_dir1/file1",
HostPath: "host_dir1/file1",
ReadOnly: true,
})
case "dev2":
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
ContainerPath: "/dev/ccc",
HostPath: "/dev/ccc",
Permissions: "mrw",
})
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
ContainerPath: "/container_dir1/file2",
HostPath: "host_dir1/file2",
ReadOnly: true,
})
resp.Envs["key1"] = "val1"
}
}
return resp, nil
},
allocateFunc: allocateStubFunc(),
}
}
if res.resourceName == "domain2.com/resource2" {
@ -549,7 +517,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
as.Nil(err)
defer os.RemoveAll(tmpDir)
nodeInfo := getTestNodeInfo(v1.ResourceList{})
testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources)
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts)
testPods := []*v1.Pod{
makePod(v1.ResourceList{
@ -604,7 +573,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
t.Errorf("DevicePluginManager error (%v). expected error: %v but got: %v",
testCase.description, testCase.expErr, err)
}
runContainerOpts := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
as.Nil(err)
if testCase.expectedContainerOptsLen == nil {
as.Nil(runContainerOpts)
} else {
@ -642,7 +612,8 @@ func TestInitContainerDeviceAllocation(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err)
defer os.RemoveAll(tmpDir)
testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources)
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts)
podWithPluginResourcesInInitContainers := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -753,3 +724,104 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
// allocatable in nodeInfo is more than needed, should skip updating
as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)]))
}
func TestDevicePreStartContainer(t *testing.T) {
// Ensures that if device manager is indicated to invoke `PreStartContainer` RPC
// by device plugin, then device manager invokes PreStartContainer at endpoint interface.
// Also verifies that final allocation of mounts, envs etc is same as expected.
res1 := TestResource{
resourceName: "domain1.com/resource1",
resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
devs: []string{"dev1", "dev2"},
}
as := require.New(t)
podsStub := activePodsStub{
activePods: []*v1.Pod{},
}
tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err)
defer os.RemoveAll(tmpDir)
nodeInfo := getTestNodeInfo(v1.ResourceList{})
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
pluginOpts[res1.resourceName] = &pluginapi.DevicePluginOptions{PreStartRequired: true}
testManager := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}, pluginOpts)
ch := make(chan []string, 1)
testManager.endpoints[res1.resourceName] = &MockEndpoint{
initChan: ch,
allocateFunc: allocateStubFunc(),
}
pod := makePod(v1.ResourceList{
v1.ResourceName(res1.resourceName): res1.resourceQuantity})
activePods := []*v1.Pod{}
activePods = append(activePods, pod)
podsStub.updateActivePods(activePods)
err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
as.Nil(err)
runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
as.Nil(err)
var initializedDevs []string
select {
case <-time.After(time.Second):
t.Fatalf("Timed out while waiting on channel for response from PreStartContainer RPC stub")
case initializedDevs = <-ch:
break
}
as.Contains(initializedDevs, "dev1")
as.Contains(initializedDevs, "dev2")
as.Equal(len(initializedDevs), len(res1.devs))
expectedResp, err := allocateStubFunc()([]string{"dev1", "dev2"})
as.Nil(err)
as.Equal(len(runContainerOpts.Devices), len(expectedResp.Devices))
as.Equal(len(runContainerOpts.Mounts), len(expectedResp.Mounts))
as.Equal(len(runContainerOpts.Envs), len(expectedResp.Envs))
}
func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) {
return func(devs []string) (*pluginapi.AllocateResponse, error) {
resp := new(pluginapi.AllocateResponse)
resp.Envs = make(map[string]string)
for _, dev := range devs {
switch dev {
case "dev1":
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
ContainerPath: "/dev/aaa",
HostPath: "/dev/aaa",
Permissions: "mrw",
})
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
ContainerPath: "/dev/bbb",
HostPath: "/dev/bbb",
Permissions: "mrw",
})
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
ContainerPath: "/container_dir1/file1",
HostPath: "host_dir1/file1",
ReadOnly: true,
})
case "dev2":
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
ContainerPath: "/dev/ccc",
HostPath: "/dev/ccc",
Permissions: "mrw",
})
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
ContainerPath: "/container_dir1/file2",
HostPath: "host_dir1/file2",
ReadOnly: true,
})
resp.Envs["key1"] = "val1"
}
}
return resp, nil
}
}

View File

@ -20,7 +20,7 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/sets"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
@ -117,7 +117,9 @@ func (pdev podDevices) devices() map[string]sets.String {
if _, exists := ret[resource]; !exists {
ret[resource] = sets.NewString()
}
ret[resource] = ret[resource].Union(devices.deviceIds)
if devices.allocResp != nil {
ret[resource] = ret[resource].Union(devices.deviceIds)
}
}
}
}

View File

@ -18,7 +18,7 @@ package devicemanager
import (
"k8s.io/api/core/v1"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -51,7 +51,7 @@ type Manager interface {
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
// for the found one. An empty struct is returned in case no cached state is found.
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error)
// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
// and inactive device plugin resources previously registered on the node.
@ -77,7 +77,7 @@ const (
errFailedToDialDevicePlugin = "failed to dial device plugin:"
// errUnsupportedVersion is the error raised when the device plugin uses an API version not
// supported by the Kubelet registry
errUnsupportedVersion = "requested API version %q is not supported by kubelet. Supported version is %q"
errUnsupportedVersion = "requested API version %q is not supported by kubelet. Supported versions are %q"
// errDevicePluginAlreadyExists is the error raised when a device plugin with the
// same Resource Name tries to register itself
errDevicePluginAlreadyExists = "another device plugin already registered this Resource Name"

View File

@ -155,6 +155,7 @@ func getGPUsAvailable(f *framework.Framework) int64 {
func SetupNVIDIAGPUNode(f *framework.Framework, setupResourceGatherer bool) *framework.ContainerResourceGatherer {
Skip("Temporarily disable the test till we update the device plugin image")
// Skip the test if the base image is not COS.
// TODO: Add support for other base images.
// CUDA apps require host mounts which is not portable across base images (yet).

View File

@ -30,7 +30,7 @@ go_library(
"//pkg/features:go_default_library",
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library",

View File

@ -33,7 +33,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/test/e2e/framework"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
dm "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
. "github.com/onsi/ginkgo"
@ -75,7 +75,7 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin] [Serial] [D
framework.ExpectNoError(err)
By("Register resources")
err = dp1.Register(pluginapi.KubeletSocket, resourceName)
err = dp1.Register(pluginapi.KubeletSocket, resourceName, false)
framework.ExpectNoError(err)
By("Waiting for the resource exported by the stub device plugin to become available on the local node")
@ -112,7 +112,7 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin] [Serial] [D
err = dp1.Start()
framework.ExpectNoError(err)
err = dp1.Register(pluginapi.KubeletSocket, resourceName)
err = dp1.Register(pluginapi.KubeletSocket, resourceName, false)
framework.ExpectNoError(err)
By("Waiting for resource to become available on the local node after re-registration")