diff --git a/cli/config/configuration-qemu.toml.in b/cli/config/configuration-qemu.toml.in index 4be52f4a94..f655136bf7 100644 --- a/cli/config/configuration-qemu.toml.in +++ b/cli/config/configuration-qemu.toml.in @@ -219,6 +219,30 @@ enable_iothreads = @DEFENABLEIOTHREADS@ # Default false #enable_template = true +# The number of caches of VMCache: +# unspecified or == 0 --> VMCache is disabled +# > 0 --> will be set to the specified number +# +# VMCache is a function that creates VMs as caches before using it. +# It helps speed up new container creation. +# The function consists of a server and some clients communicating +# through Unix socket. The protocol is gRPC in protocols/cache/cache.proto. +# The VMCache server will create some VMs and cache them by factory cache. +# It will convert the VM to gRPC format and transport it when gets +# requestion from clients. +# Factory grpccache is the VMCache client. It will request gRPC format +# VM and convert it back to a VM. If VMCache function is enabled, +# kata-runtime will request VM from factory grpccache when it creates +# a new sandbox. +# +# Default 0 +#vm_cache_number = 0 + +# Specify the address of the Unix socket that is used by VMCache. +# +# Default /var/run/kata-containers/cache.sock +#vm_cache_endpoint = "/var/run/kata-containers/cache.sock" + [proxy.@PROJECT_TYPE@] path = "@PROXYPATH@" diff --git a/hack/update-generated-runtime-proto.sh b/hack/update-generated-runtime-proto.sh new file mode 100755 index 0000000000..d2881a48b8 --- /dev/null +++ b/hack/update-generated-runtime-proto.sh @@ -0,0 +1,14 @@ +# +# Copyright 2019 HyperHQ Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +protoc \ + -I=$GOPATH/src \ + -I=$GOPATH/src/github.com/gogo/protobuf/protobuf \ + --proto_path=protocols/cache \ + --gogofast_out=\ +Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,\ +plugins=grpc:protocols/cache \ + protocols/cache/cache.proto diff --git a/pkg/katautils/config-settings.go b/pkg/katautils/config-settings.go index b85efcb5f3..da7ce7ba85 100644 --- a/pkg/katautils/config-settings.go +++ b/pkg/katautils/config-settings.go @@ -43,6 +43,8 @@ const defaultHotplugVFIOOnRootBus bool = false const defaultEntropySource = "/dev/urandom" const defaultGuestHookPath string = "" +const defaultVMCacheEndpoint string = "/var/run/kata-containers/cache.sock" + // Default config file used by stateless systems. var defaultRuntimeConfiguration = "/usr/share/defaults/kata-containers/configuration.toml" diff --git a/pkg/katautils/config.go b/pkg/katautils/config.go index 10668baae7..821381d940 100644 --- a/pkg/katautils/config.go +++ b/pkg/katautils/config.go @@ -77,7 +77,9 @@ type tomlConfig struct { } type factory struct { - Template bool `toml:"enable_template"` + Template bool `toml:"enable_template"` + VMCacheNumber uint `toml:"vm_cache_number"` + VMCacheEndpoint string `toml:"vm_cache_endpoint"` } type hypervisor struct { @@ -544,7 +546,14 @@ func newQemuHypervisorConfig(h hypervisor) (vc.HypervisorConfig, error) { } func newFactoryConfig(f factory) (oci.FactoryConfig, error) { - return oci.FactoryConfig{Template: f.Template}, nil + if f.VMCacheEndpoint == "" { + f.VMCacheEndpoint = defaultVMCacheEndpoint + } + return oci.FactoryConfig{ + Template: f.Template, + VMCacheNumber: f.VMCacheNumber, + VMCacheEndpoint: f.VMCacheEndpoint, + }, nil } func newShimConfig(s shim) (vc.ShimConfig, error) { @@ -910,6 +919,10 @@ func checkNetNsConfig(config oci.RuntimeConfig) error { // checkFactoryConfig ensures the VM factory configuration is valid. func checkFactoryConfig(config oci.RuntimeConfig) error { + if config.FactoryConfig.Template && config.FactoryConfig.VMCacheNumber > 0 { + return errors.New("VM factory cannot work together with VM cache") + } + if config.FactoryConfig.Template { if config.HypervisorConfig.InitrdPath == "" { return errors.New("Factory option enable_template requires an initrd image") @@ -920,6 +933,18 @@ func checkFactoryConfig(config oci.RuntimeConfig) error { } } + if config.FactoryConfig.VMCacheNumber > 0 { + if config.HypervisorType != vc.QemuHypervisor { + return errors.New("VM cache just support qemu") + } + if config.AgentType != vc.KataContainersAgent { + return errors.New("VM cache just support kata agent") + } + if config.HypervisorConfig.UseVSock { + return errors.New("config vsock conflicts with VM cache, please disable one of them") + } + } + return nil } diff --git a/pkg/katautils/config_test.go b/pkg/katautils/config_test.go index 3c8efa06c4..1a4f5f59ea 100644 --- a/pkg/katautils/config_test.go +++ b/pkg/katautils/config_test.go @@ -184,6 +184,10 @@ func createAllRuntimeConfigFiles(dir, hypervisor string) (config testRuntimeConf Enable: false, } + factoryConfig := oci.FactoryConfig{ + VMCacheEndpoint: defaultVMCacheEndpoint, + } + runtimeConfig := oci.RuntimeConfig{ HypervisorType: defaultHypervisor, HypervisorConfig: hypervisorConfig, @@ -199,6 +203,8 @@ func createAllRuntimeConfigFiles(dir, hypervisor string) (config testRuntimeConf NetmonConfig: netmonConfig, DisableNewNetNs: disableNewNetNs, + + FactoryConfig: factoryConfig, } err = SetKernelParams(&runtimeConfig) @@ -626,6 +632,10 @@ func TestMinimalRuntimeConfig(t *testing.T) { Enable: false, } + expectedFactoryConfig := oci.FactoryConfig{ + VMCacheEndpoint: defaultVMCacheEndpoint, + } + expectedConfig := oci.RuntimeConfig{ HypervisorType: defaultHypervisor, HypervisorConfig: expectedHypervisorConfig, @@ -640,6 +650,8 @@ func TestMinimalRuntimeConfig(t *testing.T) { ShimConfig: expectedShimConfig, NetmonConfig: expectedNetmonConfig, + + FactoryConfig: expectedFactoryConfig, } err = SetKernelParams(&expectedConfig) if err != nil { @@ -1376,7 +1388,8 @@ func TestUpdateRuntimeConfigurationFactoryConfig(t *testing.T) { config := oci.RuntimeConfig{} expectedFactoryConfig := oci.FactoryConfig{ - Template: true, + Template: true, + VMCacheEndpoint: defaultVMCacheEndpoint, } tomlConf := tomlConfig{Factory: factory{Template: true}} diff --git a/pkg/katautils/create.go b/pkg/katautils/create.go index 0e07e8003c..3b37393fd9 100644 --- a/pkg/katautils/create.go +++ b/pkg/katautils/create.go @@ -53,12 +53,14 @@ func needSystemd(config vc.HypervisorConfig) bool { // HandleFactory set the factory func HandleFactory(ctx context.Context, vci vc.VC, runtimeConfig *oci.RuntimeConfig) { - if !runtimeConfig.FactoryConfig.Template { + if !runtimeConfig.FactoryConfig.Template && runtimeConfig.FactoryConfig.VMCacheNumber == 0 { return } factoryConfig := vf.Config{ - Template: true, + Template: runtimeConfig.FactoryConfig.Template, + VMCache: runtimeConfig.FactoryConfig.VMCacheNumber > 0, + VMCacheEndpoint: runtimeConfig.FactoryConfig.VMCacheEndpoint, VMConfig: vc.VMConfig{ HypervisorType: runtimeConfig.HypervisorType, HypervisorConfig: runtimeConfig.HypervisorConfig, @@ -66,6 +68,10 @@ func HandleFactory(ctx context.Context, vci vc.VC, runtimeConfig *oci.RuntimeCon AgentConfig: runtimeConfig.AgentConfig, }, } + if runtimeConfig.FactoryConfig.VMCacheNumber > 0 { + factoryConfig.VMConfig.ProxyType = runtimeConfig.ProxyType + factoryConfig.VMConfig.ProxyConfig = runtimeConfig.ProxyConfig + } kataUtilsLogger.WithField("factory", factoryConfig).Info("load vm factory") diff --git a/protocols/cache/cache.pb.go b/protocols/cache/cache.pb.go new file mode 100644 index 0000000000..052ee4db95 --- /dev/null +++ b/protocols/cache/cache.pb.go @@ -0,0 +1,844 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: cache.proto + +/* + Package cache is a generated protocol buffer package. + + It is generated from these files: + cache.proto + + It has these top-level messages: + GrpcVMConfig + GrpcVM +*/ +package cache + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" +import google_protobuf "github.com/gogo/protobuf/types" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type GrpcVMConfig struct { + Data []byte `protobuf:"bytes,1,opt,name=Data,proto3" json:"Data,omitempty"` + AgentConfig []byte `protobuf:"bytes,2,opt,name=AgentConfig,proto3" json:"AgentConfig,omitempty"` +} + +func (m *GrpcVMConfig) Reset() { *m = GrpcVMConfig{} } +func (m *GrpcVMConfig) String() string { return proto.CompactTextString(m) } +func (*GrpcVMConfig) ProtoMessage() {} +func (*GrpcVMConfig) Descriptor() ([]byte, []int) { return fileDescriptorCache, []int{0} } + +func (m *GrpcVMConfig) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *GrpcVMConfig) GetAgentConfig() []byte { + if m != nil { + return m.AgentConfig + } + return nil +} + +type GrpcVM struct { + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Hypervisor []byte `protobuf:"bytes,2,opt,name=hypervisor,proto3" json:"hypervisor,omitempty"` + ProxyPid int64 `protobuf:"varint,3,opt,name=proxyPid,proto3" json:"proxyPid,omitempty"` + ProxyURL string `protobuf:"bytes,4,opt,name=proxyURL,proto3" json:"proxyURL,omitempty"` + Cpu uint32 `protobuf:"varint,5,opt,name=cpu,proto3" json:"cpu,omitempty"` + Memory uint32 `protobuf:"varint,6,opt,name=memory,proto3" json:"memory,omitempty"` + CpuDelta uint32 `protobuf:"varint,7,opt,name=cpuDelta,proto3" json:"cpuDelta,omitempty"` +} + +func (m *GrpcVM) Reset() { *m = GrpcVM{} } +func (m *GrpcVM) String() string { return proto.CompactTextString(m) } +func (*GrpcVM) ProtoMessage() {} +func (*GrpcVM) Descriptor() ([]byte, []int) { return fileDescriptorCache, []int{1} } + +func (m *GrpcVM) GetId() string { + if m != nil { + return m.Id + } + return "" +} + +func (m *GrpcVM) GetHypervisor() []byte { + if m != nil { + return m.Hypervisor + } + return nil +} + +func (m *GrpcVM) GetProxyPid() int64 { + if m != nil { + return m.ProxyPid + } + return 0 +} + +func (m *GrpcVM) GetProxyURL() string { + if m != nil { + return m.ProxyURL + } + return "" +} + +func (m *GrpcVM) GetCpu() uint32 { + if m != nil { + return m.Cpu + } + return 0 +} + +func (m *GrpcVM) GetMemory() uint32 { + if m != nil { + return m.Memory + } + return 0 +} + +func (m *GrpcVM) GetCpuDelta() uint32 { + if m != nil { + return m.CpuDelta + } + return 0 +} + +func init() { + proto.RegisterType((*GrpcVMConfig)(nil), "cache.GrpcVMConfig") + proto.RegisterType((*GrpcVM)(nil), "cache.GrpcVM") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for CacheService service + +type CacheServiceClient interface { + Config(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcVMConfig, error) + GetBaseVM(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcVM, error) +} + +type cacheServiceClient struct { + cc *grpc.ClientConn +} + +func NewCacheServiceClient(cc *grpc.ClientConn) CacheServiceClient { + return &cacheServiceClient{cc} +} + +func (c *cacheServiceClient) Config(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcVMConfig, error) { + out := new(GrpcVMConfig) + err := grpc.Invoke(ctx, "/cache.CacheService/Config", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *cacheServiceClient) GetBaseVM(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcVM, error) { + out := new(GrpcVM) + err := grpc.Invoke(ctx, "/cache.CacheService/GetBaseVM", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for CacheService service + +type CacheServiceServer interface { + Config(context.Context, *google_protobuf.Empty) (*GrpcVMConfig, error) + GetBaseVM(context.Context, *google_protobuf.Empty) (*GrpcVM, error) +} + +func RegisterCacheServiceServer(s *grpc.Server, srv CacheServiceServer) { + s.RegisterService(&_CacheService_serviceDesc, srv) +} + +func _CacheService_Config_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(google_protobuf.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CacheServiceServer).Config(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cache.CacheService/Config", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CacheServiceServer).Config(ctx, req.(*google_protobuf.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _CacheService_GetBaseVM_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(google_protobuf.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CacheServiceServer).GetBaseVM(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cache.CacheService/GetBaseVM", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CacheServiceServer).GetBaseVM(ctx, req.(*google_protobuf.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +var _CacheService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "cache.CacheService", + HandlerType: (*CacheServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Config", + Handler: _CacheService_Config_Handler, + }, + { + MethodName: "GetBaseVM", + Handler: _CacheService_GetBaseVM_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "cache.proto", +} + +func (m *GrpcVMConfig) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GrpcVMConfig) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Data) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintCache(dAtA, i, uint64(len(m.Data))) + i += copy(dAtA[i:], m.Data) + } + if len(m.AgentConfig) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintCache(dAtA, i, uint64(len(m.AgentConfig))) + i += copy(dAtA[i:], m.AgentConfig) + } + return i, nil +} + +func (m *GrpcVM) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GrpcVM) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Id) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintCache(dAtA, i, uint64(len(m.Id))) + i += copy(dAtA[i:], m.Id) + } + if len(m.Hypervisor) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintCache(dAtA, i, uint64(len(m.Hypervisor))) + i += copy(dAtA[i:], m.Hypervisor) + } + if m.ProxyPid != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintCache(dAtA, i, uint64(m.ProxyPid)) + } + if len(m.ProxyURL) > 0 { + dAtA[i] = 0x22 + i++ + i = encodeVarintCache(dAtA, i, uint64(len(m.ProxyURL))) + i += copy(dAtA[i:], m.ProxyURL) + } + if m.Cpu != 0 { + dAtA[i] = 0x28 + i++ + i = encodeVarintCache(dAtA, i, uint64(m.Cpu)) + } + if m.Memory != 0 { + dAtA[i] = 0x30 + i++ + i = encodeVarintCache(dAtA, i, uint64(m.Memory)) + } + if m.CpuDelta != 0 { + dAtA[i] = 0x38 + i++ + i = encodeVarintCache(dAtA, i, uint64(m.CpuDelta)) + } + return i, nil +} + +func encodeVarintCache(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *GrpcVMConfig) Size() (n int) { + var l int + _ = l + l = len(m.Data) + if l > 0 { + n += 1 + l + sovCache(uint64(l)) + } + l = len(m.AgentConfig) + if l > 0 { + n += 1 + l + sovCache(uint64(l)) + } + return n +} + +func (m *GrpcVM) Size() (n int) { + var l int + _ = l + l = len(m.Id) + if l > 0 { + n += 1 + l + sovCache(uint64(l)) + } + l = len(m.Hypervisor) + if l > 0 { + n += 1 + l + sovCache(uint64(l)) + } + if m.ProxyPid != 0 { + n += 1 + sovCache(uint64(m.ProxyPid)) + } + l = len(m.ProxyURL) + if l > 0 { + n += 1 + l + sovCache(uint64(l)) + } + if m.Cpu != 0 { + n += 1 + sovCache(uint64(m.Cpu)) + } + if m.Memory != 0 { + n += 1 + sovCache(uint64(m.Memory)) + } + if m.CpuDelta != 0 { + n += 1 + sovCache(uint64(m.CpuDelta)) + } + return n +} + +func sovCache(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozCache(x uint64) (n int) { + return sovCache(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *GrpcVMConfig) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GrpcVMConfig: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GrpcVMConfig: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthCache + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field AgentConfig", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthCache + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.AgentConfig = append(m.AgentConfig[:0], dAtA[iNdEx:postIndex]...) + if m.AgentConfig == nil { + m.AgentConfig = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCache(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthCache + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GrpcVM) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GrpcVM: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GrpcVM: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCache + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Hypervisor", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthCache + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Hypervisor = append(m.Hypervisor[:0], dAtA[iNdEx:postIndex]...) + if m.Hypervisor == nil { + m.Hypervisor = []byte{} + } + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ProxyPid", wireType) + } + m.ProxyPid = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ProxyPid |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ProxyURL", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCache + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ProxyURL = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Cpu", wireType) + } + m.Cpu = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Cpu |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Memory", wireType) + } + m.Memory = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Memory |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field CpuDelta", wireType) + } + m.CpuDelta = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCache + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.CpuDelta |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipCache(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthCache + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipCache(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCache + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCache + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCache + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthCache + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCache + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipCache(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthCache = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowCache = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("cache.proto", fileDescriptorCache) } + +var fileDescriptorCache = []byte{ + // 299 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xd1, 0x6a, 0xc2, 0x30, + 0x18, 0x85, 0x49, 0xab, 0xdd, 0xfc, 0xad, 0x63, 0x64, 0x20, 0xa1, 0x83, 0x52, 0xbc, 0xea, 0x55, + 0x05, 0xc7, 0x1e, 0x60, 0xda, 0xe1, 0xcd, 0x84, 0xd1, 0x31, 0xef, 0x6b, 0x1a, 0x6b, 0xc1, 0x9a, + 0x10, 0xd3, 0xb1, 0xbe, 0xd8, 0x9e, 0x61, 0x97, 0x7b, 0x84, 0xd1, 0x27, 0x19, 0x4d, 0xbb, 0xa2, + 0x17, 0xde, 0xe5, 0x9c, 0xff, 0x9c, 0x43, 0xf8, 0x60, 0x48, 0x63, 0xba, 0x63, 0x81, 0x90, 0x5c, + 0x71, 0xdc, 0xd7, 0xc2, 0xb9, 0x4f, 0x39, 0x4f, 0xf7, 0x6c, 0xaa, 0xcd, 0x4d, 0xb1, 0x9d, 0xb2, + 0x5c, 0xa8, 0xb2, 0xc9, 0x4c, 0x42, 0xb0, 0x97, 0x52, 0xd0, 0xf5, 0x6a, 0xc1, 0x0f, 0xdb, 0x2c, + 0xc5, 0x18, 0x7a, 0x61, 0xac, 0x62, 0x82, 0x3c, 0xe4, 0xdb, 0x91, 0x7e, 0x63, 0x0f, 0x86, 0x4f, + 0x29, 0x3b, 0xa8, 0x26, 0x42, 0x0c, 0x7d, 0x3a, 0xb5, 0x26, 0x5f, 0x08, 0xac, 0x66, 0x06, 0xdf, + 0x80, 0x91, 0x25, 0xba, 0x3e, 0x88, 0x8c, 0x2c, 0xc1, 0x2e, 0xc0, 0xae, 0x14, 0x4c, 0x7e, 0x64, + 0x47, 0x2e, 0xdb, 0xee, 0x89, 0x83, 0x1d, 0xb8, 0x16, 0x92, 0x7f, 0x96, 0xaf, 0x59, 0x42, 0x4c, + 0x0f, 0xf9, 0x66, 0xd4, 0xe9, 0xee, 0xf6, 0x1e, 0xbd, 0x90, 0x9e, 0x5e, 0xec, 0x34, 0xbe, 0x05, + 0x93, 0x8a, 0x82, 0xf4, 0x3d, 0xe4, 0x8f, 0xa2, 0xfa, 0x89, 0xc7, 0x60, 0xe5, 0x2c, 0xe7, 0xb2, + 0x24, 0x96, 0x36, 0x5b, 0x55, 0xaf, 0x50, 0x51, 0x84, 0x6c, 0xaf, 0x62, 0x72, 0xa5, 0x2f, 0x9d, + 0x9e, 0x95, 0x60, 0x2f, 0x6a, 0x48, 0x6f, 0xf5, 0x77, 0x28, 0xc3, 0x8f, 0x60, 0xb5, 0x20, 0xc6, + 0x41, 0x83, 0x2d, 0xf8, 0xc7, 0x16, 0x3c, 0xd7, 0xd8, 0x9c, 0xbb, 0xa0, 0x41, 0x7c, 0x46, 0x6d, + 0x06, 0x83, 0x25, 0x53, 0xf3, 0xf8, 0xc8, 0xd6, 0xab, 0x8b, 0xcd, 0xd1, 0x59, 0x73, 0x6e, 0x7f, + 0x57, 0x2e, 0xfa, 0xa9, 0x5c, 0xf4, 0x5b, 0xb9, 0x68, 0x63, 0xe9, 0xf0, 0xc3, 0x5f, 0x00, 0x00, + 0x00, 0xff, 0xff, 0xb5, 0xef, 0x80, 0xc1, 0xc1, 0x01, 0x00, 0x00, +} diff --git a/protocols/cache/cache.proto b/protocols/cache/cache.proto new file mode 100644 index 0000000000..6eada1eb02 --- /dev/null +++ b/protocols/cache/cache.proto @@ -0,0 +1,35 @@ +// +// Copyright 2019 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +syntax = "proto3"; + +package cache; + +import "google/protobuf/empty.proto"; + +service CacheService { + rpc Config(google.protobuf.Empty) returns (GrpcVMConfig); + rpc GetBaseVM(google.protobuf.Empty) returns (GrpcVM); +} + +message GrpcVMConfig { + bytes Data = 1; + bytes AgentConfig = 2; +} + +message GrpcVM { + string id = 1; + + bytes hypervisor = 2; + + int64 proxyPid = 3; + string proxyURL = 4; + + uint32 cpu = 5; + uint32 memory = 6; + + uint32 cpuDelta = 7; +} diff --git a/virtcontainers/agent.go b/virtcontainers/agent.go index 9cfce0f349..804f47b789 100644 --- a/virtcontainers/agent.go +++ b/virtcontainers/agent.go @@ -151,6 +151,9 @@ type agent interface { // set to use an existing proxy setProxy(sandbox *Sandbox, proxy proxy, pid int, url string) error + // set to use an existing proxy from Grpc + setProxyFromGrpc(proxy proxy, pid int, url string) + // get agent url getAgentURL() (string, error) @@ -225,6 +228,9 @@ type agent interface { // configure will update agent settings based on provided arguments configure(h hypervisor, id, sharePath string, builtin bool, config interface{}) error + // configureFromGrpc will update agent settings based on provided arguments which from Grpc + configureFromGrpc(id string, builtin bool, config interface{}) error + // getVMPath will return the agent vm socket's directory path getVMPath(id string) string diff --git a/virtcontainers/factory.go b/virtcontainers/factory.go index 45306a5f0a..8579d8a975 100644 --- a/virtcontainers/factory.go +++ b/virtcontainers/factory.go @@ -9,9 +9,15 @@ import "context" // Factory controls how a new VM is created. type Factory interface { + // Config returns base factory config. + Config() VMConfig + // GetVM gets a new VM from the factory. GetVM(ctx context.Context, config VMConfig) (*VM, error) + // GetBaseVM returns a paused VM created by the base factory. + GetBaseVM(ctx context.Context, config VMConfig) (*VM, error) + // CloseFactory closes and cleans up the factory. CloseFactory(ctx context.Context) } diff --git a/virtcontainers/factory/factory.go b/virtcontainers/factory/factory.go index e11e1632c6..4bbe198372 100644 --- a/virtcontainers/factory/factory.go +++ b/virtcontainers/factory/factory.go @@ -8,13 +8,14 @@ package factory import ( "context" "fmt" - "reflect" vc "github.com/kata-containers/runtime/virtcontainers" "github.com/kata-containers/runtime/virtcontainers/factory/base" "github.com/kata-containers/runtime/virtcontainers/factory/cache" "github.com/kata-containers/runtime/virtcontainers/factory/direct" + "github.com/kata-containers/runtime/virtcontainers/factory/grpccache" "github.com/kata-containers/runtime/virtcontainers/factory/template" + "github.com/kata-containers/runtime/virtcontainers/utils" opentracing "github.com/opentracing/opentracing-go" "github.com/sirupsen/logrus" ) @@ -24,7 +25,10 @@ var factoryLogger = logrus.FieldLogger(logrus.New()) // Config is a collection of VM factory configurations. type Config struct { Template bool - Cache uint + + VMCache bool + Cache uint + VMCacheEndpoint string VMConfig vc.VMConfig } @@ -65,6 +69,11 @@ func NewFactory(ctx context.Context, config Config, fetchOnly bool) (vc.Factory, } else { b = template.New(ctx, config.VMConfig) } + } else if config.VMCache && config.Cache == 0 { + b, err = grpccache.New(ctx, config.VMCacheEndpoint) + if err != nil { + return nil, err + } } else { b = direct.New(ctx, config.VMConfig) } @@ -100,71 +109,6 @@ func resetHypervisorConfig(config *vc.VMConfig) { config.ProxyConfig = vc.ProxyConfig{} } -func compareStruct(foo, bar reflect.Value) bool { - for i := 0; i < foo.NumField(); i++ { - if !deepCompareValue(foo.Field(i), bar.Field(i)) { - return false - } - } - - return true -} - -func compareMap(foo, bar reflect.Value) bool { - if foo.Len() != bar.Len() { - return false - } - - for _, k := range foo.MapKeys() { - if !deepCompareValue(foo.MapIndex(k), bar.MapIndex(k)) { - return false - } - } - - return true -} - -func compareSlice(foo, bar reflect.Value) bool { - if foo.Len() != bar.Len() { - return false - } - for j := 0; j < foo.Len(); j++ { - if !deepCompareValue(foo.Index(j), bar.Index(j)) { - return false - } - } - return true -} - -func deepCompareValue(foo, bar reflect.Value) bool { - if !foo.IsValid() || !bar.IsValid() { - return foo.IsValid() == bar.IsValid() - } - - if foo.Type() != bar.Type() { - return false - } - switch foo.Kind() { - case reflect.Map: - return compareMap(foo, bar) - case reflect.Array: - fallthrough - case reflect.Slice: - return compareSlice(foo, bar) - case reflect.Struct: - return compareStruct(foo, bar) - default: - return foo.Interface() == bar.Interface() - } -} - -func deepCompare(foo, bar interface{}) bool { - v1 := reflect.ValueOf(foo) - v2 := reflect.ValueOf(bar) - - return deepCompareValue(v1, v2) -} - // It's important that baseConfig and newConfig are passed by value! func checkVMConfig(config1, config2 vc.VMConfig) error { if config1.HypervisorType != config2.HypervisorType { @@ -179,7 +123,7 @@ func checkVMConfig(config1, config2 vc.VMConfig) error { resetHypervisorConfig(&config1) resetHypervisorConfig(&config2) - if !deepCompare(config1, config2) { + if !utils.DeepCompare(config1, config2) { return fmt.Errorf("hypervisor config does not match, base: %+v. new: %+v", config1, config2) } @@ -282,6 +226,16 @@ func (f *factory) GetVM(ctx context.Context, config vc.VMConfig) (*vc.VM, error) return vm, nil } +// Config returns base factory config. +func (f *factory) Config() vc.VMConfig { + return f.base.Config() +} + +// GetBaseVM returns a paused VM created by the base factory. +func (f *factory) GetBaseVM(ctx context.Context, config vc.VMConfig) (*vc.VM, error) { + return f.base.GetBaseVM(ctx, config) +} + // CloseFactory closes the factory. func (f *factory) CloseFactory(ctx context.Context) { f.base.CloseFactory(ctx) diff --git a/virtcontainers/factory/factory_test.go b/virtcontainers/factory/factory_test.go index deaa02c20e..a8e08606c2 100644 --- a/virtcontainers/factory/factory_test.go +++ b/virtcontainers/factory/factory_test.go @@ -10,11 +10,11 @@ import ( "io/ioutil" "testing" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - vc "github.com/kata-containers/runtime/virtcontainers" "github.com/kata-containers/runtime/virtcontainers/factory/base" + "github.com/kata-containers/runtime/virtcontainers/utils" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" ) func TestNewFactory(t *testing.T) { @@ -279,41 +279,41 @@ func TestDeepCompare(t *testing.T) { foo := vc.VMConfig{} bar := vc.VMConfig{} - assert.True(deepCompare(foo, bar)) + assert.True(utils.DeepCompare(foo, bar)) foo.HypervisorConfig.NumVCPUs = 1 - assert.False(deepCompare(foo, bar)) + assert.False(utils.DeepCompare(foo, bar)) bar.HypervisorConfig.NumVCPUs = 1 - assert.True(deepCompare(foo, bar)) + assert.True(utils.DeepCompare(foo, bar)) // slice foo.HypervisorConfig.KernelParams = []vc.Param{} - assert.True(deepCompare(foo, bar)) + assert.True(utils.DeepCompare(foo, bar)) foo.HypervisorConfig.KernelParams = append(foo.HypervisorConfig.KernelParams, vc.Param{Key: "key", Value: "value"}) - assert.False(deepCompare(foo, bar)) + assert.False(utils.DeepCompare(foo, bar)) bar.HypervisorConfig.KernelParams = append(bar.HypervisorConfig.KernelParams, vc.Param{Key: "key", Value: "value"}) - assert.True(deepCompare(foo, bar)) + assert.True(utils.DeepCompare(foo, bar)) // map var fooMap map[string]vc.VMConfig var barMap map[string]vc.VMConfig - assert.False(deepCompare(foo, fooMap)) - assert.True(deepCompare(fooMap, barMap)) + assert.False(utils.DeepCompare(foo, fooMap)) + assert.True(utils.DeepCompare(fooMap, barMap)) fooMap = make(map[string]vc.VMConfig) - assert.True(deepCompare(fooMap, barMap)) + assert.True(utils.DeepCompare(fooMap, barMap)) fooMap["foo"] = foo - assert.False(deepCompare(fooMap, barMap)) + assert.False(utils.DeepCompare(fooMap, barMap)) barMap = make(map[string]vc.VMConfig) - assert.False(deepCompare(fooMap, barMap)) + assert.False(utils.DeepCompare(fooMap, barMap)) barMap["foo"] = bar - assert.True(deepCompare(fooMap, barMap)) + assert.True(utils.DeepCompare(fooMap, barMap)) // invalid interface var f1 vc.Factory var f2 vc.Factory var f3 base.FactoryBase - assert.True(deepCompare(f1, f2)) - assert.True(deepCompare(f1, f3)) + assert.True(utils.DeepCompare(f1, f2)) + assert.True(utils.DeepCompare(f1, f3)) // valid interface var config Config @@ -331,8 +331,8 @@ func TestDeepCompare(t *testing.T) { } f1, err = NewFactory(ctx, config, false) assert.Nil(err) - assert.True(deepCompare(f1, f1)) + assert.True(utils.DeepCompare(f1, f1)) f2, err = NewFactory(ctx, config, false) assert.Nil(err) - assert.False(deepCompare(f1, f2)) + assert.False(utils.DeepCompare(f1, f2)) } diff --git a/virtcontainers/factory/grpccache/grpccache.go b/virtcontainers/factory/grpccache/grpccache.go new file mode 100644 index 0000000000..8dfcb45eb7 --- /dev/null +++ b/virtcontainers/factory/grpccache/grpccache.go @@ -0,0 +1,62 @@ +// Copyright (c) 2019 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// grpccache implements base vm factory that get base vm from grpc + +package grpccache + +import ( + "context" + "fmt" + types "github.com/gogo/protobuf/types" + pb "github.com/kata-containers/runtime/protocols/cache" + vc "github.com/kata-containers/runtime/virtcontainers" + "github.com/kata-containers/runtime/virtcontainers/factory/base" + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +type grpccache struct { + conn *grpc.ClientConn + config *vc.VMConfig +} + +// New returns a new direct vm factory. +func New(ctx context.Context, endpoint string) (base.FactoryBase, error) { + conn, err := grpc.Dial(fmt.Sprintf("unix://%s", endpoint), grpc.WithInsecure()) + if err != nil { + return nil, errors.Wrapf(err, "failed to connect %q", endpoint) + } + + jConfig, err := pb.NewCacheServiceClient(conn).Config(ctx, &types.Empty{}) + if err != nil { + return nil, errors.Wrapf(err, "failed to Config") + } + + config, err := vc.GrpcToVMConfig(jConfig) + if err != nil { + return nil, errors.Wrapf(err, "failed to convert JSON to VMConfig") + } + + return &grpccache{conn: conn, config: config}, nil +} + +// Config returns the direct factory's configuration. +func (g *grpccache) Config() vc.VMConfig { + return *g.config +} + +// GetBaseVM create a new VM directly. +func (g *grpccache) GetBaseVM(ctx context.Context, config vc.VMConfig) (*vc.VM, error) { + defer g.conn.Close() + gVM, err := pb.NewCacheServiceClient(g.conn).GetBaseVM(ctx, &types.Empty{}) + if err != nil { + return nil, errors.Wrapf(err, "failed to GetBaseVM") + } + return vc.NewVMFromGrpc(ctx, gVM, *g.config) +} + +// CloseFactory closes the direct vm factory. +func (g *grpccache) CloseFactory(ctx context.Context) { +} diff --git a/virtcontainers/fc.go b/virtcontainers/fc.go index d8bcad9e4f..c197e80a15 100644 --- a/virtcontainers/fc.go +++ b/virtcontainers/fc.go @@ -7,6 +7,7 @@ package virtcontainers import ( "context" + "errors" "fmt" "net/url" "os/exec" @@ -706,3 +707,11 @@ func (fc *firecracker) cleanup() error { func (fc *firecracker) pid() int { return fc.info.PID } + +func (fc *firecracker) fromGrpc(ctx context.Context, hypervisorConfig *HypervisorConfig, store *store.VCStore, j []byte) error { + return errors.New("firecracker is not supported by VM cache") +} + +func (fc *firecracker) toGrpc() ([]byte, error) { + return nil, errors.New("firecracker is not supported by VM cache") +} diff --git a/virtcontainers/hyperstart_agent.go b/virtcontainers/hyperstart_agent.go index da6b9ecb69..dc9f3aed40 100644 --- a/virtcontainers/hyperstart_agent.go +++ b/virtcontainers/hyperstart_agent.go @@ -327,6 +327,10 @@ func (h *hyper) configure(hv hypervisor, id, sharePath string, builtin bool, con return hv.addDevice(sharedVolume, fsDev) } +func (h *hyper) configureFromGrpc(id string, builtin bool, config interface{}) error { + return nil +} + func (h *hyper) createSandbox(sandbox *Sandbox) (err error) { return h.configure(sandbox.hypervisor, "", h.getSharePath(sandbox.id), false, nil) } @@ -999,6 +1003,12 @@ func (h *hyper) setProxy(sandbox *Sandbox, proxy proxy, pid int, url string) err return nil } +func (h *hyper) setProxyFromGrpc(proxy proxy, pid int, url string) { + h.proxy = proxy + h.state.ProxyPid = pid + h.state.URL = url +} + func (h *hyper) getGuestDetails(*grpc.GuestDetailsRequest) (*grpc.GuestDetailsResponse, error) { // hyperstart-agent does not support getGuestDetails return nil, nil diff --git a/virtcontainers/hypervisor.go b/virtcontainers/hypervisor.go index 6966b9e782..49885081a0 100644 --- a/virtcontainers/hypervisor.go +++ b/virtcontainers/hypervisor.go @@ -610,4 +610,6 @@ type hypervisor interface { getThreadIDs() (*threadIDs, error) cleanup() error pid() int + fromGrpc(ctx context.Context, hypervisorConfig *HypervisorConfig, store *store.VCStore, j []byte) error + toGrpc() ([]byte, error) } diff --git a/virtcontainers/kata_agent.go b/virtcontainers/kata_agent.go index f07b891068..d26c512ef4 100644 --- a/virtcontainers/kata_agent.go +++ b/virtcontainers/kata_agent.go @@ -221,7 +221,7 @@ func (k *kataAgent) capabilities() types.Capabilities { return caps } -func (k *kataAgent) configure(h hypervisor, id, sharePath string, builtin bool, config interface{}) error { +func (k *kataAgent) internalConfigure(h hypervisor, id, sharePath string, builtin bool, config interface{}) error { if config != nil { switch c := config.(type) { case KataAgentConfig: @@ -234,20 +234,32 @@ func (k *kataAgent) configure(h hypervisor, id, sharePath string, builtin bool, } } + if builtin { + k.proxyBuiltIn = true + } + + return nil +} + +func (k *kataAgent) configure(h hypervisor, id, sharePath string, builtin bool, config interface{}) error { + err := k.internalConfigure(h, id, sharePath, builtin, config) + if err != nil { + return err + } + switch s := k.vmSocket.(type) { case types.Socket: - err := h.addDevice(s, serialPortDev) + err = h.addDevice(s, serialPortDev) if err != nil { return err } case kataVSOCK: - var err error s.vhostFd, s.contextID, err = utils.FindContextID() if err != nil { return err } s.port = uint32(vSockPort) - if err := h.addDevice(s, vSockPCIDev); err != nil { + if err = h.addDevice(s, vSockPCIDev); err != nil { return err } k.vmSocket = s @@ -255,10 +267,6 @@ func (k *kataAgent) configure(h hypervisor, id, sharePath string, builtin bool, return fmt.Errorf("Invalid config type") } - if builtin { - k.proxyBuiltIn = true - } - // Neither create shared directory nor add 9p device if hypervisor // doesn't support filesystem sharing. caps := h.capabilities() @@ -273,13 +281,17 @@ func (k *kataAgent) configure(h hypervisor, id, sharePath string, builtin bool, HostPath: sharePath, } - if err := os.MkdirAll(sharedVolume.HostPath, store.DirMode); err != nil { + if err = os.MkdirAll(sharedVolume.HostPath, store.DirMode); err != nil { return err } return h.addDevice(sharedVolume, fsDev) } +func (k *kataAgent) configureFromGrpc(id string, builtin bool, config interface{}) error { + return k.internalConfigure(nil, id, "", builtin, config) +} + func (k *kataAgent) createSandbox(sandbox *Sandbox) error { span, _ := k.trace("createSandbox") defer span.Finish() @@ -587,6 +599,12 @@ func (k *kataAgent) setProxy(sandbox *Sandbox, proxy proxy, pid int, url string) return nil } +func (k *kataAgent) setProxyFromGrpc(proxy proxy, pid int, url string) { + k.proxy = proxy + k.state.ProxyPid = pid + k.state.URL = url +} + func (k *kataAgent) startSandbox(sandbox *Sandbox) error { span, _ := k.trace("startSandbox") defer span.Finish() diff --git a/virtcontainers/mock_hypervisor.go b/virtcontainers/mock_hypervisor.go index fbf1c379bc..b1efa6f670 100644 --- a/virtcontainers/mock_hypervisor.go +++ b/virtcontainers/mock_hypervisor.go @@ -7,6 +7,7 @@ package virtcontainers import ( "context" + "errors" "os" "github.com/kata-containers/runtime/virtcontainers/store" @@ -105,3 +106,11 @@ func (m *mockHypervisor) cleanup() error { func (m *mockHypervisor) pid() int { return m.mockPid } + +func (m *mockHypervisor) fromGrpc(ctx context.Context, hypervisorConfig *HypervisorConfig, store *store.VCStore, j []byte) error { + return errors.New("mockHypervisor is not supported by VM cache") +} + +func (m *mockHypervisor) toGrpc() ([]byte, error) { + return nil, errors.New("firecracker is not supported by VM cache") +} diff --git a/virtcontainers/noop_agent.go b/virtcontainers/noop_agent.go index 6ba15d4b33..db9a3dcbdf 100644 --- a/virtcontainers/noop_agent.go +++ b/virtcontainers/noop_agent.go @@ -171,6 +171,10 @@ func (n *noopAgent) configure(h hypervisor, id, sharePath string, builtin bool, return nil } +func (n *noopAgent) configureFromGrpc(id string, builtin bool, config interface{}) error { + return nil +} + // getVMPath is the Noop agent vm path getter. It does nothing. func (n *noopAgent) getVMPath(id string) string { return "" @@ -201,6 +205,9 @@ func (n *noopAgent) setProxy(sandbox *Sandbox, proxy proxy, pid int, url string) return nil } +func (n *noopAgent) setProxyFromGrpc(proxy proxy, pid int, url string) { +} + // getGuestDetails is the Noop agent GuestDetails queryer. It does nothing. func (n *noopAgent) getGuestDetails(*grpc.GuestDetailsRequest) (*grpc.GuestDetailsResponse, error) { return nil, nil diff --git a/virtcontainers/pkg/oci/utils.go b/virtcontainers/pkg/oci/utils.go index 4ffb4ec957..269edc9620 100644 --- a/virtcontainers/pkg/oci/utils.go +++ b/virtcontainers/pkg/oci/utils.go @@ -96,6 +96,12 @@ type CompatOCISpec struct { type FactoryConfig struct { // Template enables VM templating support in VM factory. Template bool + + // VMCacheNumber specifies the the number of caches of VMCache. + VMCacheNumber uint + + // VMCacheEndpoint specifies the endpoint of transport VM from the VM cache server to runtime. + VMCacheEndpoint string } // RuntimeConfig aggregates all runtime specific settings diff --git a/virtcontainers/qemu.go b/virtcontainers/qemu.go index 25e78ea06f..7ee216433f 100644 --- a/virtcontainers/qemu.go +++ b/virtcontainers/qemu.go @@ -7,6 +7,7 @@ package virtcontainers import ( "context" + "encoding/json" "fmt" "io/ioutil" "math" @@ -1600,3 +1601,55 @@ func (q *qemu) pid() int { return pid } + +type qemuGrpc struct { + ID string + QmpChannelpath string + State QemuState + NvdimmCount int + + // Most members of q.qemuConfig are just to generate + // q.qemuConfig.qemuParams that is used by LaunchQemu except + // q.qemuConfig.SMP. + // So just transport q.qemuConfig.SMP from VM Cache server to runtime. + QemuSMP govmmQemu.SMP +} + +func (q *qemu) fromGrpc(ctx context.Context, hypervisorConfig *HypervisorConfig, store *store.VCStore, j []byte) error { + var qp qemuGrpc + err := json.Unmarshal(j, &qp) + if err != nil { + return err + } + + q.id = qp.ID + q.store = store + q.config = *hypervisorConfig + q.qmpMonitorCh.ctx = ctx + q.qmpMonitorCh.path = qp.QmpChannelpath + q.qemuConfig.Ctx = ctx + q.state = qp.State + q.arch = newQemuArch(q.config) + q.ctx = ctx + q.nvdimmCount = qp.NvdimmCount + + q.qemuConfig.SMP = qp.QemuSMP + + return nil +} + +func (q *qemu) toGrpc() ([]byte, error) { + q.qmpShutdown() + + q.cleanup() + qp := qemuGrpc{ + ID: q.id, + QmpChannelpath: q.qmpMonitorCh.path, + State: q.state, + NvdimmCount: q.nvdimmCount, + + QemuSMP: q.qemuConfig.SMP, + } + + return json.Marshal(&qp) +} diff --git a/virtcontainers/qemu_test.go b/virtcontainers/qemu_test.go index 6c79a9297a..067a9dfca9 100644 --- a/virtcontainers/qemu_test.go +++ b/virtcontainers/qemu_test.go @@ -424,3 +424,22 @@ func TestQemuCleanup(t *testing.T) { err := q.cleanup() assert.Nil(err) } + +func TestQemuGrpc(t *testing.T) { + assert := assert.New(t) + + config := newQemuConfig() + q := &qemu{ + id: "testqemu", + config: config, + } + + json, err := q.toGrpc() + assert.Nil(err) + + var q2 qemu + err = q2.fromGrpc(context.Background(), &config, nil, json) + assert.Nil(err) + + assert.True(q.id == q2.id) +} diff --git a/virtcontainers/utils/compare.go b/virtcontainers/utils/compare.go new file mode 100644 index 0000000000..e67b6bc476 --- /dev/null +++ b/virtcontainers/utils/compare.go @@ -0,0 +1,74 @@ +// Copyright (c) 2019 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package utils + +import "reflect" + +func compareStruct(foo, bar reflect.Value) bool { + for i := 0; i < foo.NumField(); i++ { + if !deepCompareValue(foo.Field(i), bar.Field(i)) { + return false + } + } + + return true +} + +func compareMap(foo, bar reflect.Value) bool { + if foo.Len() != bar.Len() { + return false + } + + for _, k := range foo.MapKeys() { + if !deepCompareValue(foo.MapIndex(k), bar.MapIndex(k)) { + return false + } + } + + return true +} + +func compareSlice(foo, bar reflect.Value) bool { + if foo.Len() != bar.Len() { + return false + } + for j := 0; j < foo.Len(); j++ { + if !deepCompareValue(foo.Index(j), bar.Index(j)) { + return false + } + } + return true +} + +func deepCompareValue(foo, bar reflect.Value) bool { + if !foo.IsValid() || !bar.IsValid() { + return foo.IsValid() == bar.IsValid() + } + + if foo.Type() != bar.Type() { + return false + } + switch foo.Kind() { + case reflect.Map: + return compareMap(foo, bar) + case reflect.Array: + fallthrough + case reflect.Slice: + return compareSlice(foo, bar) + case reflect.Struct: + return compareStruct(foo, bar) + default: + return foo.Interface() == bar.Interface() + } +} + +// DeepCompare compare foo and bar. +func DeepCompare(foo, bar interface{}) bool { + v1 := reflect.ValueOf(foo) + v2 := reflect.ValueOf(bar) + + return deepCompareValue(v1, v2) +} diff --git a/virtcontainers/vm.go b/virtcontainers/vm.go index ea0e6c3042..dc9d6f097b 100644 --- a/virtcontainers/vm.go +++ b/virtcontainers/vm.go @@ -7,10 +7,13 @@ package virtcontainers import ( "context" + "encoding/json" + "fmt" "os" "path/filepath" "time" + pb "github.com/kata-containers/runtime/protocols/cache" "github.com/kata-containers/runtime/virtcontainers/pkg/uuid" "github.com/kata-containers/runtime/virtcontainers/store" "github.com/sirupsen/logrus" @@ -52,6 +55,63 @@ func (c *VMConfig) Valid() error { return c.HypervisorConfig.valid() } +// ToGrpc convert VMConfig struct to grpc format pb.GrpcVMConfig. +func (c *VMConfig) ToGrpc() (*pb.GrpcVMConfig, error) { + data, err := json.Marshal(&c) + if err != nil { + return nil, err + } + + var agentConfig []byte + switch aconf := c.AgentConfig.(type) { + case HyperConfig: + agentConfig, err = json.Marshal(&aconf) + case KataAgentConfig: + agentConfig, err = json.Marshal(&aconf) + default: + err = fmt.Errorf("agent type %s is not supported by VM cache", c.AgentType) + } + if err != nil { + return nil, err + } + + return &pb.GrpcVMConfig{ + Data: data, + AgentConfig: agentConfig, + }, nil +} + +// GrpcToVMConfig convert grpc format pb.GrpcVMConfig to VMConfig struct. +func GrpcToVMConfig(j *pb.GrpcVMConfig) (*VMConfig, error) { + var config VMConfig + err := json.Unmarshal(j.Data, &config) + if err != nil { + return nil, err + } + + switch config.AgentType { + case HyperstartAgent: + var hyperConfig HyperConfig + err := json.Unmarshal(j.AgentConfig, &hyperConfig) + if err == nil { + config.AgentConfig = hyperConfig + } + case KataContainersAgent: + var kataConfig KataAgentConfig + err := json.Unmarshal(j.AgentConfig, &kataConfig) + if err == nil { + config.AgentConfig = kataConfig + } + default: + err = fmt.Errorf("agent type %s is not supported by VM cache", config.AgentType) + } + if err != nil { + return nil, err + } + + return &config, nil +} + func setupProxy(h hypervisor, agent agent, config VMConfig, id string) (int, string, proxy, error) { consoleURL, err := h.getSandboxConsole(id) if err != nil { @@ -187,6 +247,57 @@ func NewVM(ctx context.Context, config VMConfig) (*VM, error) { }, nil } +// NewVMFromGrpc creates a new VM based on provided pb.GrpcVM and VMConfig. +func NewVMFromGrpc(ctx context.Context, v *pb.GrpcVM, config VMConfig) (*VM, error) { + virtLog.WithField("GrpcVM", v).WithField("config", config).Info("create new vm from Grpc") + + hypervisor, err := newHypervisor(config.HypervisorType) + if err != nil { + return nil, err + } + + vcStore, err := store.NewVCStore(ctx, + store.SandboxConfigurationRoot(v.Id), + store.SandboxRuntimeRoot(v.Id)) + if err != nil { + return nil, err + } + + defer func() { + if err != nil { + virtLog.WithField("vm", v.Id).WithError(err).Error("failed to create new vm from Grpc") + virtLog.WithField("vm", v.Id).Errorf("Deleting store for %s", v.Id) + vcStore.Delete() + } + }() + + err = hypervisor.fromGrpc(ctx, &config.HypervisorConfig, vcStore, v.Hypervisor) + if err != nil { + return nil, err + } + + agent := newAgent(config.AgentType) + agent.configureFromGrpc(v.Id, isProxyBuiltIn(config.ProxyType), config.AgentConfig) + + proxy, err := newProxy(config.ProxyType) + if err != nil { + return nil, err + } + agent.setProxyFromGrpc(proxy, int(v.ProxyPid), v.ProxyURL) + + return &VM{ + id: v.Id, + hypervisor: hypervisor, + agent: agent, + proxy: proxy, + proxyPid: int(v.ProxyPid), + proxyURL: v.ProxyURL, + cpu: v.Cpu, + memory: v.Memory, + cpuDelta: v.CpuDelta, + }, nil +} + func buildVMSharePath(id string) string { return filepath.Join(store.RunVMStoragePath, id, "shared") } @@ -353,3 +464,23 @@ func (v *VM) assignSandbox(s *Sandbox) error { return nil } + +// ToGrpc convert VM struct to Grpc format pb.GrpcVM. +func (v *VM) ToGrpc(config VMConfig) (*pb.GrpcVM, error) { + hJSON, err := v.hypervisor.toGrpc() + if err != nil { + return nil, err + } + + return &pb.GrpcVM{ + Id: v.id, + Hypervisor: hJSON, + + ProxyPid: int64(v.proxyPid), + ProxyURL: v.proxyURL, + + Cpu: v.cpu, + Memory: v.memory, + CpuDelta: v.cpuDelta, + }, nil +} diff --git a/virtcontainers/vm_test.go b/virtcontainers/vm_test.go index 5fda0f98ab..90025d2381 100644 --- a/virtcontainers/vm_test.go +++ b/virtcontainers/vm_test.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "testing" + "github.com/kata-containers/runtime/virtcontainers/utils" "github.com/stretchr/testify/assert" ) @@ -110,3 +111,22 @@ func TestSetupProxy(t *testing.T) { _, _, _, err = setupProxy(hypervisor, agent, config, "foobar") assert.Nil(err) } + +func TestVMConfigGrpc(t *testing.T) { + assert := assert.New(t) + config := VMConfig{ + HypervisorType: QemuHypervisor, + HypervisorConfig: newQemuConfig(), + AgentType: KataContainersAgent, + AgentConfig: KataAgentConfig{false, true}, + ProxyType: NoopProxyType, + } + + p, err := config.ToGrpc() + assert.Nil(err) + + config2, err := GrpcToVMConfig(p) + assert.Nil(err) + + assert.True(utils.DeepCompare(config, *config2)) +}