Merge pull request #1437 from teawater/vmcache_grpc

Make factory can control VMCache server
This commit is contained in:
Fupan Li 2019-04-11 14:01:47 +08:00 committed by GitHub
commit 6d81e44670
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 619 additions and 30 deletions

View File

@ -13,6 +13,7 @@ import (
"os/signal" "os/signal"
"path/filepath" "path/filepath"
"syscall" "syscall"
"time"
"github.com/gogo/protobuf/types" "github.com/gogo/protobuf/types"
pb "github.com/kata-containers/runtime/protocols/cache" pb "github.com/kata-containers/runtime/protocols/cache"
@ -43,6 +44,7 @@ var factoryCLICommand = cli.Command{
type cacheServer struct { type cacheServer struct {
rpc *grpc.Server rpc *grpc.Server
factory vc.Factory factory vc.Factory
done chan struct{}
} }
var jsonVMConfig *pb.GrpcVMConfig var jsonVMConfig *pb.GrpcVMConfig
@ -74,6 +76,29 @@ func (s *cacheServer) GetBaseVM(ctx context.Context, empty *types.Empty) (*pb.Gr
return vm.ToGrpc(config) return vm.ToGrpc(config)
} }
func (s *cacheServer) quit() {
s.rpc.GracefulStop()
close(s.done)
}
// Quit will stop VMCache server after 1 second.
func (s *cacheServer) Quit(ctx context.Context, empty *types.Empty) (*types.Empty, error) {
go func() {
kataLog.Info("VM cache server will stop after 1 second")
time.Sleep(time.Second)
s.quit()
}()
return nil, nil
}
func (s *cacheServer) Status(ctx context.Context, empty *types.Empty) (*pb.GrpcStatus, error) {
stat := pb.GrpcStatus{
Pid: int64(os.Getpid()),
Vmstatus: s.factory.GetVMStatus(),
}
return &stat, nil
}
func getUnixListener(path string) (net.Listener, error) { func getUnixListener(path string) (net.Listener, error) {
err := os.MkdirAll(filepath.Dir(path), 0755) err := os.MkdirAll(filepath.Dir(path), 0755)
if err != nil { if err != nil {
@ -102,8 +127,8 @@ var handledSignals = []os.Signal{
syscall.SIGPIPE, syscall.SIGPIPE,
} }
func handleSignals(s *cacheServer, signals chan os.Signal) chan struct{} { func handleSignals(s *cacheServer, signals chan os.Signal) {
done := make(chan struct{}, 1) s.done = make(chan struct{}, 1)
go func() { go func() {
for { for {
sig := <-signals sig := <-signals
@ -112,13 +137,11 @@ func handleSignals(s *cacheServer, signals chan os.Signal) chan struct{} {
case unix.SIGPIPE: case unix.SIGPIPE:
continue continue
default: default:
s.rpc.GracefulStop() s.quit()
close(done)
return return
} }
} }
}() }()
return done
} }
var initFactoryCommand = cli.Command{ var initFactoryCommand = cli.Command{
@ -168,13 +191,13 @@ var initFactoryCommand = cli.Command{
defer l.Close() defer l.Close()
signals := make(chan os.Signal, 8) signals := make(chan os.Signal, 8)
done := handleSignals(s, signals) handleSignals(s, signals)
signal.Notify(signals, handledSignals...) signal.Notify(signals, handledSignals...)
kataLog.WithField("endpoint", runtimeConfig.FactoryConfig.VMCacheEndpoint).Info("VM cache server start") kataLog.WithField("endpoint", runtimeConfig.FactoryConfig.VMCacheEndpoint).Info("VM cache server start")
s.rpc.Serve(l) s.rpc.Serve(l)
<-done <-s.done
kataLog.WithField("endpoint", runtimeConfig.FactoryConfig.VMCacheEndpoint).Info("VM cache server stop") kataLog.WithField("endpoint", runtimeConfig.FactoryConfig.VMCacheEndpoint).Info("VM cache server stop")
return nil return nil
@ -221,7 +244,19 @@ var destroyFactoryCommand = cli.Command{
return errors.New("invalid runtime config") return errors.New("invalid runtime config")
} }
if runtimeConfig.FactoryConfig.Template { if runtimeConfig.FactoryConfig.VMCacheNumber > 0 {
conn, err := grpc.Dial(fmt.Sprintf("unix://%s", runtimeConfig.FactoryConfig.VMCacheEndpoint), grpc.WithInsecure())
if err != nil {
return errors.Wrapf(err, "failed to connect %q", runtimeConfig.FactoryConfig.VMCacheEndpoint)
}
defer conn.Close()
_, err = pb.NewCacheServiceClient(conn).Quit(ctx, &types.Empty{})
if err != nil {
return errors.Wrapf(err, "failed to call gRPC Quit")
}
// Wait VMCache server stop
time.Sleep(time.Second)
} else if runtimeConfig.FactoryConfig.Template {
factoryConfig := vf.Config{ factoryConfig := vf.Config{
Template: true, Template: true,
VMConfig: vc.VMConfig{ VMConfig: vc.VMConfig{
@ -259,6 +294,23 @@ var statusFactoryCommand = cli.Command{
return errors.New("invalid runtime config") return errors.New("invalid runtime config")
} }
if runtimeConfig.FactoryConfig.VMCacheNumber > 0 {
conn, err := grpc.Dial(fmt.Sprintf("unix://%s", runtimeConfig.FactoryConfig.VMCacheEndpoint), grpc.WithInsecure())
if err != nil {
fmt.Fprintln(defaultOutputFile, errors.Wrapf(err, "failed to connect %q", runtimeConfig.FactoryConfig.VMCacheEndpoint))
} else {
defer conn.Close()
status, err := pb.NewCacheServiceClient(conn).Status(ctx, &types.Empty{})
if err != nil {
fmt.Fprintln(defaultOutputFile, errors.Wrapf(err, "failed to call gRPC Status\n"))
} else {
fmt.Fprintf(defaultOutputFile, "VM cache server pid = %d\n", status.Pid)
for _, vs := range status.Vmstatus {
fmt.Fprintf(defaultOutputFile, "VM pid = %d Cpu = %d Memory = %dMiB\n", vs.Pid, vs.Cpu, vs.Memory)
}
}
}
}
if runtimeConfig.FactoryConfig.Template { if runtimeConfig.FactoryConfig.Template {
factoryConfig := vf.Config{ factoryConfig := vf.Config{
Template: true, Template: true,

View File

@ -10,6 +10,8 @@
It has these top-level messages: It has these top-level messages:
GrpcVMConfig GrpcVMConfig
GrpcVM GrpcVM
GrpcStatus
GrpcVMStatus
*/ */
package cache package cache
@ -124,9 +126,67 @@ func (m *GrpcVM) GetCpuDelta() uint32 {
return 0 return 0
} }
type GrpcStatus struct {
Pid int64 `protobuf:"varint,1,opt,name=pid,proto3" json:"pid,omitempty"`
Vmstatus []*GrpcVMStatus `protobuf:"bytes,2,rep,name=vmstatus" json:"vmstatus,omitempty"`
}
func (m *GrpcStatus) Reset() { *m = GrpcStatus{} }
func (m *GrpcStatus) String() string { return proto.CompactTextString(m) }
func (*GrpcStatus) ProtoMessage() {}
func (*GrpcStatus) Descriptor() ([]byte, []int) { return fileDescriptorCache, []int{2} }
func (m *GrpcStatus) GetPid() int64 {
if m != nil {
return m.Pid
}
return 0
}
func (m *GrpcStatus) GetVmstatus() []*GrpcVMStatus {
if m != nil {
return m.Vmstatus
}
return nil
}
type GrpcVMStatus struct {
Pid int64 `protobuf:"varint,1,opt,name=pid,proto3" json:"pid,omitempty"`
Cpu uint32 `protobuf:"varint,2,opt,name=cpu,proto3" json:"cpu,omitempty"`
Memory uint32 `protobuf:"varint,3,opt,name=memory,proto3" json:"memory,omitempty"`
}
func (m *GrpcVMStatus) Reset() { *m = GrpcVMStatus{} }
func (m *GrpcVMStatus) String() string { return proto.CompactTextString(m) }
func (*GrpcVMStatus) ProtoMessage() {}
func (*GrpcVMStatus) Descriptor() ([]byte, []int) { return fileDescriptorCache, []int{3} }
func (m *GrpcVMStatus) GetPid() int64 {
if m != nil {
return m.Pid
}
return 0
}
func (m *GrpcVMStatus) GetCpu() uint32 {
if m != nil {
return m.Cpu
}
return 0
}
func (m *GrpcVMStatus) GetMemory() uint32 {
if m != nil {
return m.Memory
}
return 0
}
func init() { func init() {
proto.RegisterType((*GrpcVMConfig)(nil), "cache.GrpcVMConfig") proto.RegisterType((*GrpcVMConfig)(nil), "cache.GrpcVMConfig")
proto.RegisterType((*GrpcVM)(nil), "cache.GrpcVM") proto.RegisterType((*GrpcVM)(nil), "cache.GrpcVM")
proto.RegisterType((*GrpcStatus)(nil), "cache.GrpcStatus")
proto.RegisterType((*GrpcVMStatus)(nil), "cache.GrpcVMStatus")
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
@ -142,6 +202,8 @@ const _ = grpc.SupportPackageIsVersion4
type CacheServiceClient interface { type CacheServiceClient interface {
Config(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcVMConfig, error) Config(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcVMConfig, error)
GetBaseVM(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcVM, error) GetBaseVM(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcVM, error)
Status(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcStatus, error)
Quit(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error)
} }
type cacheServiceClient struct { type cacheServiceClient struct {
@ -170,11 +232,31 @@ func (c *cacheServiceClient) GetBaseVM(ctx context.Context, in *google_protobuf.
return out, nil return out, nil
} }
func (c *cacheServiceClient) Status(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcStatus, error) {
out := new(GrpcStatus)
err := grpc.Invoke(ctx, "/cache.CacheService/Status", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *cacheServiceClient) Quit(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) {
out := new(google_protobuf.Empty)
err := grpc.Invoke(ctx, "/cache.CacheService/Quit", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for CacheService service // Server API for CacheService service
type CacheServiceServer interface { type CacheServiceServer interface {
Config(context.Context, *google_protobuf.Empty) (*GrpcVMConfig, error) Config(context.Context, *google_protobuf.Empty) (*GrpcVMConfig, error)
GetBaseVM(context.Context, *google_protobuf.Empty) (*GrpcVM, error) GetBaseVM(context.Context, *google_protobuf.Empty) (*GrpcVM, error)
Status(context.Context, *google_protobuf.Empty) (*GrpcStatus, error)
Quit(context.Context, *google_protobuf.Empty) (*google_protobuf.Empty, error)
} }
func RegisterCacheServiceServer(s *grpc.Server, srv CacheServiceServer) { func RegisterCacheServiceServer(s *grpc.Server, srv CacheServiceServer) {
@ -217,6 +299,42 @@ func _CacheService_GetBaseVM_Handler(srv interface{}, ctx context.Context, dec f
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _CacheService_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(google_protobuf.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CacheServiceServer).Status(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/cache.CacheService/Status",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CacheServiceServer).Status(ctx, req.(*google_protobuf.Empty))
}
return interceptor(ctx, in, info, handler)
}
func _CacheService_Quit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(google_protobuf.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CacheServiceServer).Quit(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/cache.CacheService/Quit",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CacheServiceServer).Quit(ctx, req.(*google_protobuf.Empty))
}
return interceptor(ctx, in, info, handler)
}
var _CacheService_serviceDesc = grpc.ServiceDesc{ var _CacheService_serviceDesc = grpc.ServiceDesc{
ServiceName: "cache.CacheService", ServiceName: "cache.CacheService",
HandlerType: (*CacheServiceServer)(nil), HandlerType: (*CacheServiceServer)(nil),
@ -229,6 +347,14 @@ var _CacheService_serviceDesc = grpc.ServiceDesc{
MethodName: "GetBaseVM", MethodName: "GetBaseVM",
Handler: _CacheService_GetBaseVM_Handler, Handler: _CacheService_GetBaseVM_Handler,
}, },
{
MethodName: "Status",
Handler: _CacheService_Status_Handler,
},
{
MethodName: "Quit",
Handler: _CacheService_Quit_Handler,
},
}, },
Streams: []grpc.StreamDesc{}, Streams: []grpc.StreamDesc{},
Metadata: "cache.proto", Metadata: "cache.proto",
@ -320,6 +446,74 @@ func (m *GrpcVM) MarshalTo(dAtA []byte) (int, error) {
return i, nil return i, nil
} }
func (m *GrpcStatus) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *GrpcStatus) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.Pid != 0 {
dAtA[i] = 0x8
i++
i = encodeVarintCache(dAtA, i, uint64(m.Pid))
}
if len(m.Vmstatus) > 0 {
for _, msg := range m.Vmstatus {
dAtA[i] = 0x12
i++
i = encodeVarintCache(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n
}
}
return i, nil
}
func (m *GrpcVMStatus) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *GrpcVMStatus) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.Pid != 0 {
dAtA[i] = 0x8
i++
i = encodeVarintCache(dAtA, i, uint64(m.Pid))
}
if m.Cpu != 0 {
dAtA[i] = 0x10
i++
i = encodeVarintCache(dAtA, i, uint64(m.Cpu))
}
if m.Memory != 0 {
dAtA[i] = 0x18
i++
i = encodeVarintCache(dAtA, i, uint64(m.Memory))
}
return i, nil
}
func encodeVarintCache(dAtA []byte, offset int, v uint64) int { func encodeVarintCache(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 { for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80) dAtA[offset] = uint8(v&0x7f | 0x80)
@ -373,6 +567,36 @@ func (m *GrpcVM) Size() (n int) {
return n return n
} }
func (m *GrpcStatus) Size() (n int) {
var l int
_ = l
if m.Pid != 0 {
n += 1 + sovCache(uint64(m.Pid))
}
if len(m.Vmstatus) > 0 {
for _, e := range m.Vmstatus {
l = e.Size()
n += 1 + l + sovCache(uint64(l))
}
}
return n
}
func (m *GrpcVMStatus) Size() (n int) {
var l int
_ = l
if m.Pid != 0 {
n += 1 + sovCache(uint64(m.Pid))
}
if m.Cpu != 0 {
n += 1 + sovCache(uint64(m.Cpu))
}
if m.Memory != 0 {
n += 1 + sovCache(uint64(m.Memory))
}
return n
}
func sovCache(x uint64) (n int) { func sovCache(x uint64) (n int) {
for { for {
n++ n++
@ -713,6 +937,213 @@ func (m *GrpcVM) Unmarshal(dAtA []byte) error {
} }
return nil return nil
} }
func (m *GrpcStatus) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCache
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: GrpcStatus: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: GrpcStatus: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Pid", wireType)
}
m.Pid = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCache
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Pid |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Vmstatus", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCache
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthCache
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Vmstatus = append(m.Vmstatus, &GrpcVMStatus{})
if err := m.Vmstatus[len(m.Vmstatus)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipCache(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthCache
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *GrpcVMStatus) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCache
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: GrpcVMStatus: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: GrpcVMStatus: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Pid", wireType)
}
m.Pid = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCache
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Pid |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Cpu", wireType)
}
m.Cpu = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCache
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Cpu |= (uint32(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Memory", wireType)
}
m.Memory = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCache
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Memory |= (uint32(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipCache(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthCache
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipCache(dAtA []byte) (n int, err error) { func skipCache(dAtA []byte) (n int, err error) {
l := len(dAtA) l := len(dAtA)
iNdEx := 0 iNdEx := 0
@ -821,24 +1252,29 @@ var (
func init() { proto.RegisterFile("cache.proto", fileDescriptorCache) } func init() { proto.RegisterFile("cache.proto", fileDescriptorCache) }
var fileDescriptorCache = []byte{ var fileDescriptorCache = []byte{
// 299 bytes of a gzipped FileDescriptorProto // 374 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xd1, 0x6a, 0xc2, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x51, 0xcd, 0x0a, 0xda, 0x40,
0x18, 0x85, 0x49, 0xab, 0xdd, 0xfc, 0xad, 0x63, 0x64, 0x20, 0xa1, 0x83, 0x52, 0xbc, 0xea, 0x55, 0x10, 0x66, 0x13, 0x4d, 0x75, 0x8c, 0xa5, 0xdd, 0x82, 0x2c, 0x16, 0x42, 0xc8, 0x29, 0xa7, 0x08,
0x05, 0xc7, 0x1e, 0x60, 0xda, 0xe1, 0xcd, 0x84, 0xd1, 0x31, 0xef, 0x6b, 0x1a, 0x6b, 0xc1, 0x9a, 0x4a, 0x7b, 0xaf, 0x5a, 0x84, 0x52, 0x69, 0x1b, 0xa9, 0xf7, 0x18, 0xd7, 0x18, 0x30, 0xee, 0x92,
0x10, 0xd3, 0xb1, 0xbe, 0xd8, 0x9e, 0x61, 0x97, 0x7b, 0x84, 0xd1, 0x27, 0x19, 0x4d, 0xbb, 0xa2, 0x6c, 0xa4, 0x79, 0xb1, 0x3e, 0x43, 0x8f, 0x7d, 0x84, 0xe2, 0xa1, 0xcf, 0x51, 0x76, 0xb3, 0x86,
0x17, 0xde, 0xe5, 0x9c, 0xff, 0x9c, 0x43, 0xf8, 0x60, 0x48, 0x63, 0xba, 0x63, 0x81, 0x90, 0x5c, 0x08, 0xcd, 0x6d, 0x66, 0xbe, 0x9f, 0xfd, 0x76, 0x06, 0x46, 0x71, 0x14, 0x9f, 0x69, 0xc0, 0x73,
0x71, 0xdc, 0xd7, 0xc2, 0xb9, 0x4f, 0x39, 0x4f, 0xf7, 0x6c, 0xaa, 0xcd, 0x4d, 0xb1, 0x9d, 0xb2, 0x26, 0x18, 0xee, 0xab, 0x66, 0xfa, 0x36, 0x61, 0x2c, 0xb9, 0xd0, 0x99, 0x1a, 0x1e, 0xca, 0xd3,
0x5c, 0xa8, 0xb2, 0xc9, 0x4c, 0x42, 0xb0, 0x97, 0x52, 0xd0, 0xf5, 0x6a, 0xc1, 0x0f, 0xdb, 0x2c, 0x8c, 0x66, 0x5c, 0x54, 0x35, 0xc7, 0x5b, 0x83, 0xbd, 0xc9, 0x79, 0xbc, 0xdf, 0xae, 0xd8, 0xf5,
0xc5, 0x18, 0x7a, 0x61, 0xac, 0x62, 0x82, 0x3c, 0xe4, 0xdb, 0x91, 0x7e, 0x63, 0x0f, 0x86, 0x4f, 0x94, 0x26, 0x18, 0x43, 0x6f, 0x1d, 0x89, 0x88, 0x20, 0x17, 0xf9, 0x76, 0xa8, 0x6a, 0xec, 0xc2,
0x29, 0x3b, 0xa8, 0x26, 0x42, 0x0c, 0x7d, 0x3a, 0xb5, 0x26, 0x5f, 0x08, 0xac, 0x66, 0x06, 0xdf, 0xe8, 0x43, 0x42, 0xaf, 0xa2, 0xa6, 0x10, 0x43, 0x41, 0xed, 0x91, 0xf7, 0x13, 0x81, 0x55, 0xdb,
0x80, 0x91, 0x25, 0xba, 0x3e, 0x88, 0x8c, 0x2c, 0xc1, 0x2e, 0xc0, 0xae, 0x14, 0x4c, 0x7e, 0x64, 0xe0, 0x97, 0x60, 0xa4, 0x47, 0x25, 0x1f, 0x86, 0x46, 0x7a, 0xc4, 0x0e, 0xc0, 0xb9, 0xe2, 0x34,
0x47, 0x2e, 0xdb, 0xee, 0x89, 0x83, 0x1d, 0xb8, 0x16, 0x92, 0x7f, 0x96, 0xaf, 0x59, 0x42, 0x4c, 0xbf, 0xa5, 0x05, 0xcb, 0xb5, 0xb6, 0x35, 0xc1, 0x53, 0x18, 0xf0, 0x9c, 0xfd, 0xa8, 0xbe, 0xa6,
0x0f, 0xf9, 0x66, 0xd4, 0xe9, 0xee, 0xf6, 0x1e, 0xbd, 0x90, 0x9e, 0x5e, 0xec, 0x34, 0xbe, 0x05, 0x47, 0x62, 0xba, 0xc8, 0x37, 0xc3, 0xa6, 0x6f, 0xb0, 0xef, 0xe1, 0x67, 0xd2, 0x53, 0x8e, 0x4d,
0x93, 0x8a, 0x82, 0xf4, 0x3d, 0xe4, 0x8f, 0xa2, 0xfa, 0x89, 0xc7, 0x60, 0xe5, 0x2c, 0xe7, 0xb2, 0x8f, 0x5f, 0x81, 0x19, 0xf3, 0x92, 0xf4, 0x5d, 0xe4, 0x8f, 0x43, 0x59, 0xe2, 0x09, 0x58, 0x19,
0x24, 0x96, 0x36, 0x5b, 0x55, 0xaf, 0x50, 0x51, 0x84, 0x6c, 0xaf, 0x62, 0x72, 0xa5, 0x2f, 0x9d, 0xcd, 0x58, 0x5e, 0x11, 0x4b, 0x0d, 0x75, 0x27, 0x5d, 0x62, 0x5e, 0xae, 0xe9, 0x45, 0x44, 0xe4,
0x9e, 0x95, 0x60, 0x2f, 0x6a, 0x48, 0x6f, 0xf5, 0x77, 0x28, 0xc3, 0x8f, 0x60, 0xb5, 0x20, 0xc6, 0x85, 0x42, 0x9a, 0xde, 0xfb, 0x02, 0x20, 0x73, 0xef, 0x44, 0x24, 0xca, 0x42, 0x7a, 0x72, 0x1d,
0x41, 0x83, 0x2d, 0xf8, 0xc7, 0x16, 0x3c, 0xd7, 0xd8, 0x9c, 0xbb, 0xa0, 0x41, 0x7c, 0x46, 0x6d, 0xde, 0x0c, 0x65, 0x89, 0x67, 0x30, 0xb8, 0x65, 0x85, 0x42, 0x89, 0xe1, 0x9a, 0xfe, 0x68, 0xfe,
0x06, 0x83, 0x25, 0x53, 0xf3, 0xf8, 0xc8, 0xd6, 0xab, 0x8b, 0xcd, 0xd1, 0x59, 0x73, 0x6e, 0x7f, 0x26, 0xa8, 0x57, 0x5c, 0x7f, 0xb7, 0x16, 0x86, 0x0d, 0xc9, 0xfb, 0xf4, 0xd8, 0x67, 0xa7, 0xa5,
0x57, 0x2e, 0xfa, 0xa9, 0x5c, 0xf4, 0x5b, 0xb9, 0x68, 0x63, 0xe9, 0xf0, 0xc3, 0x5f, 0x00, 0x00, 0x0e, 0x6e, 0xfc, 0x2f, 0xb8, 0xd9, 0x0e, 0x3e, 0xff, 0x8b, 0xc0, 0x5e, 0xc9, 0xc7, 0x76, 0x72,
0x00, 0xff, 0xff, 0xb5, 0xef, 0x80, 0xc1, 0xc1, 0x01, 0x00, 0x00, 0x59, 0x31, 0xc5, 0xef, 0xc0, 0xd2, 0x67, 0x9a, 0x04, 0xf5, 0x51, 0x83, 0xc7, 0x51, 0x83, 0x8f,
0xf2, 0xa8, 0xd3, 0xe7, 0x74, 0x9a, 0x3c, 0x87, 0xe1, 0x86, 0x8a, 0x65, 0x54, 0xd0, 0xfd, 0xb6,
0x53, 0x39, 0x7e, 0x52, 0xe2, 0x05, 0x58, 0xfa, 0x07, 0x5d, 0x82, 0xd7, 0x2d, 0x81, 0xa6, 0xbe,
0x87, 0xde, 0xb7, 0x32, 0x15, 0x9d, 0x92, 0x8e, 0xf9, 0xd2, 0xfe, 0x75, 0x77, 0xd0, 0xef, 0xbb,
0x83, 0xfe, 0xdc, 0x1d, 0x74, 0xb0, 0x14, 0xba, 0xf8, 0x17, 0x00, 0x00, 0xff, 0xff, 0x40, 0x5e,
0x19, 0x6e, 0xcc, 0x02, 0x00, 0x00,
} }

View File

@ -13,6 +13,8 @@ import "google/protobuf/empty.proto";
service CacheService { service CacheService {
rpc Config(google.protobuf.Empty) returns (GrpcVMConfig); rpc Config(google.protobuf.Empty) returns (GrpcVMConfig);
rpc GetBaseVM(google.protobuf.Empty) returns (GrpcVM); rpc GetBaseVM(google.protobuf.Empty) returns (GrpcVM);
rpc Status(google.protobuf.Empty) returns (GrpcStatus);
rpc Quit(google.protobuf.Empty) returns (google.protobuf.Empty);
} }
message GrpcVMConfig { message GrpcVMConfig {
@ -33,3 +35,16 @@ message GrpcVM {
uint32 cpuDelta = 7; uint32 cpuDelta = 7;
} }
message GrpcStatus {
int64 pid = 1;
repeated GrpcVMStatus vmstatus = 2;
}
message GrpcVMStatus {
int64 pid = 1;
uint32 cpu = 2;
uint32 memory = 3;
}

View File

@ -5,13 +5,20 @@
package virtcontainers package virtcontainers
import "context" import (
"context"
pb "github.com/kata-containers/runtime/protocols/cache"
)
// Factory controls how a new VM is created. // Factory controls how a new VM is created.
type Factory interface { type Factory interface {
// Config returns base factory config. // Config returns base factory config.
Config() VMConfig Config() VMConfig
// GetVMStatus returns the status of the paused VM created by the base factory.
GetVMStatus() []*pb.GrpcVMStatus
// GetVM gets a new VM from the factory. // GetVM gets a new VM from the factory.
GetVM(ctx context.Context, config VMConfig) (*VM, error) GetVM(ctx context.Context, config VMConfig) (*VM, error)

View File

@ -8,6 +8,7 @@ package base
import ( import (
"context" "context"
pb "github.com/kata-containers/runtime/protocols/cache"
vc "github.com/kata-containers/runtime/virtcontainers" vc "github.com/kata-containers/runtime/virtcontainers"
) )
@ -20,6 +21,9 @@ type FactoryBase interface {
// Config returns base factory config. // Config returns base factory config.
Config() vc.VMConfig Config() vc.VMConfig
// GetVMStatus returns the status of the paused VM created by the base factory.
GetVMStatus() []*pb.GrpcVMStatus
// GetBaseVM returns a paused VM created by the base factory. // GetBaseVM returns a paused VM created by the base factory.
GetBaseVM(ctx context.Context, config vc.VMConfig) (*vc.VM, error) GetBaseVM(ctx context.Context, config vc.VMConfig) (*vc.VM, error)

View File

@ -11,6 +11,7 @@ import (
"fmt" "fmt"
"sync" "sync"
pb "github.com/kata-containers/runtime/protocols/cache"
vc "github.com/kata-containers/runtime/virtcontainers" vc "github.com/kata-containers/runtime/virtcontainers"
"github.com/kata-containers/runtime/virtcontainers/factory/base" "github.com/kata-containers/runtime/virtcontainers/factory/base"
) )
@ -22,6 +23,9 @@ type cache struct {
closed chan<- int closed chan<- int
wg sync.WaitGroup wg sync.WaitGroup
closeOnce sync.Once closeOnce sync.Once
vmm map[*vc.VM]interface{}
vmmLock sync.RWMutex
} }
// New creates a new cached vm factory. // New creates a new cached vm factory.
@ -32,7 +36,12 @@ func New(ctx context.Context, count uint, b base.FactoryBase) base.FactoryBase {
cacheCh := make(chan *vc.VM) cacheCh := make(chan *vc.VM)
closed := make(chan int, count) closed := make(chan int, count)
c := cache{base: b, cacheCh: cacheCh, closed: closed} c := cache{
base: b,
cacheCh: cacheCh,
closed: closed,
vmm: make(map[*vc.VM]interface{}),
}
for i := 0; i < int(count); i++ { for i := 0; i < int(count); i++ {
c.wg.Add(1) c.wg.Add(1)
go func() { go func() {
@ -43,10 +52,16 @@ func New(ctx context.Context, count uint, b base.FactoryBase) base.FactoryBase {
c.CloseFactory(ctx) c.CloseFactory(ctx)
return return
} }
c.addToVmm(vm)
select { select {
case cacheCh <- vm: case cacheCh <- vm:
// Because vm will not be relased or changed
// by cacheServer.GetBaseVM or removeFromVmm.
// So removeFromVmm can be called after vm send to cacheCh.
c.removeFromVmm(vm)
case <-closed: case <-closed:
c.removeFromVmm(vm)
vm.Stop() vm.Stop()
c.wg.Done() c.wg.Done()
return return
@ -57,11 +72,39 @@ func New(ctx context.Context, count uint, b base.FactoryBase) base.FactoryBase {
return &c return &c
} }
func (c *cache) addToVmm(vm *vc.VM) {
c.vmmLock.Lock()
defer c.vmmLock.Unlock()
c.vmm[vm] = nil
}
func (c *cache) removeFromVmm(vm *vc.VM) {
c.vmmLock.Lock()
defer c.vmmLock.Unlock()
delete(c.vmm, vm)
}
// Config returns cache vm factory's base factory config. // Config returns cache vm factory's base factory config.
func (c *cache) Config() vc.VMConfig { func (c *cache) Config() vc.VMConfig {
return c.base.Config() return c.base.Config()
} }
// GetVMStatus returns the status of the cached VMs.
func (c *cache) GetVMStatus() []*pb.GrpcVMStatus {
vs := []*pb.GrpcVMStatus{}
c.vmmLock.RLock()
defer c.vmmLock.RUnlock()
for vm := range c.vmm {
vs = append(vs, vm.GetVMStatus())
}
return vs
}
// GetBaseVM returns a base VM from cache factory's base factory. // GetBaseVM returns a base VM from cache factory's base factory.
func (c *cache) GetBaseVM(ctx context.Context, config vc.VMConfig) (*vc.VM, error) { func (c *cache) GetBaseVM(ctx context.Context, config vc.VMConfig) (*vc.VM, error) {
vm, ok := <-c.cacheCh vm, ok := <-c.cacheCh

View File

@ -9,6 +9,7 @@ package direct
import ( import (
"context" "context"
pb "github.com/kata-containers/runtime/protocols/cache"
vc "github.com/kata-containers/runtime/virtcontainers" vc "github.com/kata-containers/runtime/virtcontainers"
"github.com/kata-containers/runtime/virtcontainers/factory/base" "github.com/kata-containers/runtime/virtcontainers/factory/base"
) )
@ -46,3 +47,8 @@ func (d *direct) GetBaseVM(ctx context.Context, config vc.VMConfig) (*vc.VM, err
// CloseFactory closes the direct vm factory. // CloseFactory closes the direct vm factory.
func (d *direct) CloseFactory(ctx context.Context) { func (d *direct) CloseFactory(ctx context.Context) {
} }
// GetVMStatus is not supported
func (d *direct) GetVMStatus() []*pb.GrpcVMStatus {
panic("ERROR: package direct does not support GetVMStatus")
}

View File

@ -9,6 +9,7 @@ import (
"context" "context"
"fmt" "fmt"
pb "github.com/kata-containers/runtime/protocols/cache"
vc "github.com/kata-containers/runtime/virtcontainers" vc "github.com/kata-containers/runtime/virtcontainers"
"github.com/kata-containers/runtime/virtcontainers/factory/base" "github.com/kata-containers/runtime/virtcontainers/factory/base"
"github.com/kata-containers/runtime/virtcontainers/factory/cache" "github.com/kata-containers/runtime/virtcontainers/factory/cache"
@ -234,6 +235,11 @@ func (f *factory) Config() vc.VMConfig {
return f.base.Config() return f.base.Config()
} }
// GetVMStatus returns the status of the paused VM created by the base factory.
func (f *factory) GetVMStatus() []*pb.GrpcVMStatus {
return f.base.GetVMStatus()
}
// GetBaseVM returns a paused VM created by the base factory. // GetBaseVM returns a paused VM created by the base factory.
func (f *factory) GetBaseVM(ctx context.Context, config vc.VMConfig) (*vc.VM, error) { func (f *factory) GetBaseVM(ctx context.Context, config vc.VMConfig) (*vc.VM, error) {
return f.base.GetBaseVM(ctx, config) return f.base.GetBaseVM(ctx, config)

View File

@ -9,6 +9,7 @@ package grpccache
import ( import (
"context" "context"
"fmt" "fmt"
types "github.com/gogo/protobuf/types" types "github.com/gogo/protobuf/types"
pb "github.com/kata-containers/runtime/protocols/cache" pb "github.com/kata-containers/runtime/protocols/cache"
vc "github.com/kata-containers/runtime/virtcontainers" vc "github.com/kata-containers/runtime/virtcontainers"
@ -60,3 +61,8 @@ func (g *grpccache) GetBaseVM(ctx context.Context, config vc.VMConfig) (*vc.VM,
// CloseFactory closes the direct vm factory. // CloseFactory closes the direct vm factory.
func (g *grpccache) CloseFactory(ctx context.Context) { func (g *grpccache) CloseFactory(ctx context.Context) {
} }
// GetVMStatus is not supported
func (g *grpccache) GetVMStatus() []*pb.GrpcVMStatus {
panic("ERROR: package grpccache does not support GetVMStatus")
}

View File

@ -13,6 +13,7 @@ import (
"syscall" "syscall"
"time" "time"
pb "github.com/kata-containers/runtime/protocols/cache"
vc "github.com/kata-containers/runtime/virtcontainers" vc "github.com/kata-containers/runtime/virtcontainers"
"github.com/kata-containers/runtime/virtcontainers/factory/base" "github.com/kata-containers/runtime/virtcontainers/factory/base"
"github.com/kata-containers/runtime/virtcontainers/store" "github.com/kata-containers/runtime/virtcontainers/store"
@ -78,6 +79,11 @@ func (t *template) CloseFactory(ctx context.Context) {
t.close() t.close()
} }
// GetVMStatus is not supported
func (t *template) GetVMStatus() []*pb.GrpcVMStatus {
panic("ERROR: package template does not support GetVMStatus")
}
func (t *template) close() { func (t *template) close() {
syscall.Unmount(t.statePath, 0) syscall.Unmount(t.statePath, 0)
os.RemoveAll(t.statePath) os.RemoveAll(t.statePath)

View File

@ -485,3 +485,11 @@ func (v *VM) ToGrpc(config VMConfig) (*pb.GrpcVM, error) {
CpuDelta: v.cpuDelta, CpuDelta: v.cpuDelta,
}, nil }, nil
} }
func (v *VM) GetVMStatus() *pb.GrpcVMStatus {
return &pb.GrpcVMStatus{
Pid: int64(v.hypervisor.pid()),
Cpu: v.cpu,
Memory: v.memory,
}
}