factory: Make factory destroy can stop VMCache server

VMCache server just can be stopped by ctrl-c without current commit.

This commit adds a new gRPC function Quit to CacheService.  VMCache
server will stop by itself after 1 second when Quit is called.
Factory destroy will call gRPC Quit when VMCache is enabled.

Fixes: #1395

Signed-off-by: Hui Zhu <teawater@hyper.sh>
This commit is contained in:
Hui Zhu 2019-03-20 17:07:11 +08:00
parent 3343e9f7b3
commit f639787e02
3 changed files with 90 additions and 28 deletions

View File

@ -13,6 +13,7 @@ import (
"os/signal" "os/signal"
"path/filepath" "path/filepath"
"syscall" "syscall"
"time"
"github.com/gogo/protobuf/types" "github.com/gogo/protobuf/types"
pb "github.com/kata-containers/runtime/protocols/cache" pb "github.com/kata-containers/runtime/protocols/cache"
@ -43,6 +44,7 @@ var factoryCLICommand = cli.Command{
type cacheServer struct { type cacheServer struct {
rpc *grpc.Server rpc *grpc.Server
factory vc.Factory factory vc.Factory
done chan struct{}
} }
var jsonVMConfig *pb.GrpcVMConfig var jsonVMConfig *pb.GrpcVMConfig
@ -74,6 +76,21 @@ func (s *cacheServer) GetBaseVM(ctx context.Context, empty *types.Empty) (*pb.Gr
return vm.ToGrpc(config) return vm.ToGrpc(config)
} }
func (s *cacheServer) quit() {
s.rpc.GracefulStop()
close(s.done)
}
// Quit will stop VMCache server after 1 second.
func (s *cacheServer) Quit(ctx context.Context, empty *types.Empty) (*types.Empty, error) {
go func() {
kataLog.Info("VM cache server will stop after 1 second")
time.Sleep(time.Second)
s.quit()
}()
return nil, nil
}
func getUnixListener(path string) (net.Listener, error) { func getUnixListener(path string) (net.Listener, error) {
err := os.MkdirAll(filepath.Dir(path), 0755) err := os.MkdirAll(filepath.Dir(path), 0755)
if err != nil { if err != nil {
@ -102,8 +119,8 @@ var handledSignals = []os.Signal{
syscall.SIGPIPE, syscall.SIGPIPE,
} }
func handleSignals(s *cacheServer, signals chan os.Signal) chan struct{} { func handleSignals(s *cacheServer, signals chan os.Signal) {
done := make(chan struct{}, 1) s.done = make(chan struct{}, 1)
go func() { go func() {
for { for {
sig := <-signals sig := <-signals
@ -112,13 +129,11 @@ func handleSignals(s *cacheServer, signals chan os.Signal) chan struct{} {
case unix.SIGPIPE: case unix.SIGPIPE:
continue continue
default: default:
s.rpc.GracefulStop() s.quit()
close(done)
return return
} }
} }
}() }()
return done
} }
var initFactoryCommand = cli.Command{ var initFactoryCommand = cli.Command{
@ -168,13 +183,13 @@ var initFactoryCommand = cli.Command{
defer l.Close() defer l.Close()
signals := make(chan os.Signal, 8) signals := make(chan os.Signal, 8)
done := handleSignals(s, signals) handleSignals(s, signals)
signal.Notify(signals, handledSignals...) signal.Notify(signals, handledSignals...)
kataLog.WithField("endpoint", runtimeConfig.FactoryConfig.VMCacheEndpoint).Info("VM cache server start") kataLog.WithField("endpoint", runtimeConfig.FactoryConfig.VMCacheEndpoint).Info("VM cache server start")
s.rpc.Serve(l) s.rpc.Serve(l)
<-done <-s.done
kataLog.WithField("endpoint", runtimeConfig.FactoryConfig.VMCacheEndpoint).Info("VM cache server stop") kataLog.WithField("endpoint", runtimeConfig.FactoryConfig.VMCacheEndpoint).Info("VM cache server stop")
return nil return nil
@ -221,7 +236,19 @@ var destroyFactoryCommand = cli.Command{
return errors.New("invalid runtime config") return errors.New("invalid runtime config")
} }
if runtimeConfig.FactoryConfig.Template { if runtimeConfig.FactoryConfig.VMCacheNumber > 0 {
conn, err := grpc.Dial(fmt.Sprintf("unix://%s", runtimeConfig.FactoryConfig.VMCacheEndpoint), grpc.WithInsecure())
if err != nil {
return errors.Wrapf(err, "failed to connect %q", runtimeConfig.FactoryConfig.VMCacheEndpoint)
}
defer conn.Close()
_, err = pb.NewCacheServiceClient(conn).Quit(ctx, &types.Empty{})
if err != nil {
return errors.Wrapf(err, "failed to call gRPC Quit")
}
// Wait VMCache server stop
time.Sleep(time.Second)
} else if runtimeConfig.FactoryConfig.Template {
factoryConfig := vf.Config{ factoryConfig := vf.Config{
Template: true, Template: true,
VMConfig: vc.VMConfig{ VMConfig: vc.VMConfig{

View File

@ -142,6 +142,7 @@ const _ = grpc.SupportPackageIsVersion4
type CacheServiceClient interface { type CacheServiceClient interface {
Config(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcVMConfig, error) Config(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcVMConfig, error)
GetBaseVM(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcVM, error) GetBaseVM(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*GrpcVM, error)
Quit(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error)
} }
type cacheServiceClient struct { type cacheServiceClient struct {
@ -170,11 +171,21 @@ func (c *cacheServiceClient) GetBaseVM(ctx context.Context, in *google_protobuf.
return out, nil return out, nil
} }
func (c *cacheServiceClient) Quit(ctx context.Context, in *google_protobuf.Empty, opts ...grpc.CallOption) (*google_protobuf.Empty, error) {
out := new(google_protobuf.Empty)
err := grpc.Invoke(ctx, "/cache.CacheService/Quit", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for CacheService service // Server API for CacheService service
type CacheServiceServer interface { type CacheServiceServer interface {
Config(context.Context, *google_protobuf.Empty) (*GrpcVMConfig, error) Config(context.Context, *google_protobuf.Empty) (*GrpcVMConfig, error)
GetBaseVM(context.Context, *google_protobuf.Empty) (*GrpcVM, error) GetBaseVM(context.Context, *google_protobuf.Empty) (*GrpcVM, error)
Quit(context.Context, *google_protobuf.Empty) (*google_protobuf.Empty, error)
} }
func RegisterCacheServiceServer(s *grpc.Server, srv CacheServiceServer) { func RegisterCacheServiceServer(s *grpc.Server, srv CacheServiceServer) {
@ -217,6 +228,24 @@ func _CacheService_GetBaseVM_Handler(srv interface{}, ctx context.Context, dec f
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _CacheService_Quit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(google_protobuf.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(CacheServiceServer).Quit(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/cache.CacheService/Quit",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(CacheServiceServer).Quit(ctx, req.(*google_protobuf.Empty))
}
return interceptor(ctx, in, info, handler)
}
var _CacheService_serviceDesc = grpc.ServiceDesc{ var _CacheService_serviceDesc = grpc.ServiceDesc{
ServiceName: "cache.CacheService", ServiceName: "cache.CacheService",
HandlerType: (*CacheServiceServer)(nil), HandlerType: (*CacheServiceServer)(nil),
@ -229,6 +258,10 @@ var _CacheService_serviceDesc = grpc.ServiceDesc{
MethodName: "GetBaseVM", MethodName: "GetBaseVM",
Handler: _CacheService_GetBaseVM_Handler, Handler: _CacheService_GetBaseVM_Handler,
}, },
{
MethodName: "Quit",
Handler: _CacheService_Quit_Handler,
},
}, },
Streams: []grpc.StreamDesc{}, Streams: []grpc.StreamDesc{},
Metadata: "cache.proto", Metadata: "cache.proto",
@ -821,24 +854,25 @@ var (
func init() { proto.RegisterFile("cache.proto", fileDescriptorCache) } func init() { proto.RegisterFile("cache.proto", fileDescriptorCache) }
var fileDescriptorCache = []byte{ var fileDescriptorCache = []byte{
// 299 bytes of a gzipped FileDescriptorProto // 312 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xd1, 0x6a, 0xc2, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xcf, 0x4a, 0xeb, 0x40,
0x18, 0x85, 0x49, 0xab, 0xdd, 0xfc, 0xad, 0x63, 0x64, 0x20, 0xa1, 0x83, 0x52, 0xbc, 0xea, 0x55, 0x18, 0xc5, 0x99, 0xfe, 0xc9, 0xbd, 0xfd, 0x9a, 0x8a, 0x8c, 0x50, 0x86, 0x0a, 0x21, 0x74, 0xd5,
0x05, 0xc7, 0x1e, 0x60, 0xda, 0xe1, 0xcd, 0x84, 0xd1, 0x31, 0xef, 0x6b, 0x1a, 0x6b, 0xc1, 0x9a, 0x55, 0x0a, 0x15, 0xdd, 0xdb, 0x46, 0xba, 0xb1, 0xa0, 0x11, 0xbb, 0x4f, 0x27, 0xd3, 0x74, 0xa0,
0x10, 0xd3, 0xb1, 0xbe, 0xd8, 0x9e, 0x61, 0x97, 0x7b, 0x84, 0xd1, 0x27, 0x19, 0x4d, 0xbb, 0xa2, 0xe9, 0x0c, 0xd3, 0x89, 0x98, 0xc7, 0x72, 0xe3, 0x33, 0xb8, 0xf4, 0x11, 0x24, 0x4f, 0x22, 0x99,
0x17, 0xde, 0xe5, 0x9c, 0xff, 0x9c, 0x43, 0xf8, 0x60, 0x48, 0x63, 0xba, 0x63, 0x81, 0x90, 0x5c, 0xc4, 0xd0, 0x2e, 0xb2, 0x9b, 0x73, 0xce, 0x77, 0x0e, 0xc3, 0x0f, 0xfa, 0x34, 0xa4, 0x3b, 0xe6,
0x71, 0xdc, 0xd7, 0xc2, 0xb9, 0x4f, 0x39, 0x4f, 0xf7, 0x6c, 0xaa, 0xcd, 0x4d, 0xb1, 0x9d, 0xb2, 0x49, 0x25, 0xb4, 0xc0, 0x5d, 0x23, 0x46, 0xd7, 0xb1, 0x10, 0xf1, 0x9e, 0x4d, 0x8d, 0xb9, 0x49,
0x5c, 0xa8, 0xb2, 0xc9, 0x4c, 0x42, 0xb0, 0x97, 0x52, 0xd0, 0xf5, 0x6a, 0xc1, 0x0f, 0xdb, 0x2c, 0xb7, 0x53, 0x96, 0x48, 0x9d, 0x95, 0x37, 0x63, 0x1f, 0xec, 0xa5, 0x92, 0x74, 0xbd, 0x5a, 0x88,
0xc5, 0x18, 0x7a, 0x61, 0xac, 0x62, 0x82, 0x3c, 0xe4, 0xdb, 0x91, 0x7e, 0x63, 0x0f, 0x86, 0x4f, 0xc3, 0x96, 0xc7, 0x18, 0x43, 0xc7, 0x0f, 0x75, 0x48, 0x90, 0x8b, 0x26, 0x76, 0x60, 0xde, 0xd8,
0x29, 0x3b, 0xa8, 0x26, 0x42, 0x0c, 0x7d, 0x3a, 0xb5, 0x26, 0x5f, 0x08, 0xac, 0x66, 0x06, 0xdf, 0x85, 0xfe, 0x7d, 0xcc, 0x0e, 0xba, 0x3c, 0x21, 0x2d, 0x13, 0x9d, 0x5a, 0xe3, 0x4f, 0x04, 0x56,
0x80, 0x91, 0x25, 0xba, 0x3e, 0x88, 0x8c, 0x2c, 0xc1, 0x2e, 0xc0, 0xae, 0x14, 0x4c, 0x7e, 0x64, 0x39, 0x83, 0x2f, 0xa0, 0xc5, 0x23, 0x53, 0xef, 0x05, 0x2d, 0x1e, 0x61, 0x07, 0x60, 0x97, 0x49,
0x47, 0x2e, 0xdb, 0xee, 0x89, 0x83, 0x1d, 0xb8, 0x16, 0x92, 0x7f, 0x96, 0xaf, 0x59, 0x42, 0x4c, 0xa6, 0xde, 0xf8, 0x51, 0xa8, 0xaa, 0x7b, 0xe2, 0xe0, 0x11, 0xfc, 0x97, 0x4a, 0xbc, 0x67, 0x4f,
0x0f, 0xf9, 0x66, 0xd4, 0xe9, 0xee, 0xf6, 0x1e, 0xbd, 0x90, 0x9e, 0x5e, 0xec, 0x34, 0xbe, 0x05, 0x3c, 0x22, 0x6d, 0x17, 0x4d, 0xda, 0x41, 0xad, 0xeb, 0xec, 0x35, 0x78, 0x24, 0x1d, 0xb3, 0x58,
0x93, 0x8a, 0x82, 0xf4, 0x3d, 0xe4, 0x8f, 0xa2, 0xfa, 0x89, 0xc7, 0x60, 0xe5, 0x2c, 0xe7, 0xb2, 0x6b, 0x7c, 0x09, 0x6d, 0x2a, 0x53, 0xd2, 0x75, 0xd1, 0x64, 0x10, 0x14, 0x4f, 0x3c, 0x04, 0x2b,
0x24, 0x96, 0x36, 0x5b, 0x55, 0xaf, 0x50, 0x51, 0x84, 0x6c, 0xaf, 0x62, 0x72, 0xa5, 0x2f, 0x9d, 0x61, 0x89, 0x50, 0x19, 0xb1, 0x8c, 0x59, 0xa9, 0x62, 0x85, 0xca, 0xd4, 0x67, 0x7b, 0x1d, 0x92,
0x9e, 0x95, 0x60, 0x2f, 0x6a, 0x48, 0x6f, 0xf5, 0x77, 0x28, 0xc3, 0x8f, 0x60, 0xb5, 0x20, 0xc6, 0x7f, 0x26, 0xa9, 0xf5, 0xec, 0x03, 0x81, 0xbd, 0x28, 0x28, 0xbd, 0x14, 0xff, 0xa1, 0x0c, 0xdf,
0x41, 0x83, 0x2d, 0xf8, 0xc7, 0x16, 0x3c, 0xd7, 0xd8, 0x9c, 0xbb, 0xa0, 0x41, 0x7c, 0x46, 0x6d, 0x82, 0x55, 0x91, 0x18, 0x7a, 0x25, 0x37, 0xef, 0x8f, 0x9b, 0xf7, 0x50, 0x70, 0x1b, 0x5d, 0x79,
0x06, 0x83, 0x25, 0x53, 0xf3, 0xf8, 0xc8, 0xd6, 0xab, 0x8b, 0xcd, 0xd1, 0x59, 0x73, 0x6e, 0x7f, 0x25, 0xe3, 0x33, 0x6c, 0x33, 0xe8, 0x2d, 0x99, 0x9e, 0x87, 0x47, 0xb6, 0x5e, 0x35, 0x36, 0x07,
0x57, 0x2e, 0xfa, 0xa9, 0x5c, 0xf4, 0x5b, 0xb9, 0x68, 0x63, 0xe9, 0xf0, 0xc3, 0x5f, 0x00, 0x00, 0x67, 0x4d, 0x7c, 0x07, 0x9d, 0xe7, 0x94, 0xeb, 0xc6, 0xf3, 0x06, 0x7f, 0x6e, 0x7f, 0xe5, 0x0e,
0x00, 0xff, 0xff, 0xb5, 0xef, 0x80, 0xc1, 0xc1, 0x01, 0x00, 0x00, 0xfa, 0xce, 0x1d, 0xf4, 0x93, 0x3b, 0x68, 0x63, 0x99, 0xf4, 0xe6, 0x37, 0x00, 0x00, 0xff, 0xff,
0x2a, 0x60, 0xa2, 0x5e, 0xfa, 0x01, 0x00, 0x00,
} }

View File

@ -13,6 +13,7 @@ import "google/protobuf/empty.proto";
service CacheService { service CacheService {
rpc Config(google.protobuf.Empty) returns (GrpcVMConfig); rpc Config(google.protobuf.Empty) returns (GrpcVMConfig);
rpc GetBaseVM(google.protobuf.Empty) returns (GrpcVM); rpc GetBaseVM(google.protobuf.Empty) returns (GrpcVM);
rpc Quit(google.protobuf.Empty) returns (google.protobuf.Empty);
} }
message GrpcVMConfig { message GrpcVMConfig {