change the protocols from grpc to ttrpc

Switch protocols from grpc to ttrpc

Fixes: #148

Signed-off-by: fupan.lfp <fupan.lfp@antfin.com>
This commit is contained in:
fupan.lfp
2020-05-08 19:21:52 +08:00
parent 010b7a9fba
commit 1d6e7ac405
3 changed files with 76 additions and 73 deletions

View File

@@ -39,7 +39,6 @@ import (
"github.com/vishvananda/netlink" "github.com/vishvananda/netlink"
"golang.org/x/net/context" "golang.org/x/net/context"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
golangGrpc "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status" grpcStatus "google.golang.org/grpc/status"
) )
@@ -1898,96 +1897,96 @@ func (k *kataAgent) reseedRNG(data []byte) error {
return err return err
} }
type reqFunc func(context.Context, interface{}, ...golangGrpc.CallOption) (interface{}, error) type reqFunc func(context.Context, interface{}) (interface{}, error)
func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) { func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
k.reqHandlers = make(map[string]reqFunc) k.reqHandlers = make(map[string]reqFunc)
k.reqHandlers[grpcCheckRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcCheckRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.Check(ctx, req.(*grpc.CheckRequest), opts...) return k.client.HealthClient.Check(ctx, req.(*grpc.CheckRequest))
} }
k.reqHandlers[grpcExecProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcExecProcessRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.ExecProcess(ctx, req.(*grpc.ExecProcessRequest), opts...) return k.client.AgentServiceClient.ExecProcess(ctx, req.(*grpc.ExecProcessRequest))
} }
k.reqHandlers[grpcCreateSandboxRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcCreateSandboxRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.CreateSandbox(ctx, req.(*grpc.CreateSandboxRequest), opts...) return k.client.AgentServiceClient.CreateSandbox(ctx, req.(*grpc.CreateSandboxRequest))
} }
k.reqHandlers[grpcDestroySandboxRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcDestroySandboxRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.DestroySandbox(ctx, req.(*grpc.DestroySandboxRequest), opts...) return k.client.AgentServiceClient.DestroySandbox(ctx, req.(*grpc.DestroySandboxRequest))
} }
k.reqHandlers[grpcCreateContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcCreateContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.CreateContainer(ctx, req.(*grpc.CreateContainerRequest), opts...) return k.client.AgentServiceClient.CreateContainer(ctx, req.(*grpc.CreateContainerRequest))
} }
k.reqHandlers[grpcStartContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcStartContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.StartContainer(ctx, req.(*grpc.StartContainerRequest), opts...) return k.client.AgentServiceClient.StartContainer(ctx, req.(*grpc.StartContainerRequest))
} }
k.reqHandlers[grpcRemoveContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcRemoveContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.RemoveContainer(ctx, req.(*grpc.RemoveContainerRequest), opts...) return k.client.AgentServiceClient.RemoveContainer(ctx, req.(*grpc.RemoveContainerRequest))
} }
k.reqHandlers[grpcSignalProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcSignalProcessRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.SignalProcess(ctx, req.(*grpc.SignalProcessRequest), opts...) return k.client.AgentServiceClient.SignalProcess(ctx, req.(*grpc.SignalProcessRequest))
} }
k.reqHandlers[grpcUpdateRoutesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcUpdateRoutesRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.UpdateRoutes(ctx, req.(*grpc.UpdateRoutesRequest), opts...) return k.client.AgentServiceClient.UpdateRoutes(ctx, req.(*grpc.UpdateRoutesRequest))
} }
k.reqHandlers[grpcUpdateInterfaceRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcUpdateInterfaceRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.UpdateInterface(ctx, req.(*grpc.UpdateInterfaceRequest), opts...) return k.client.AgentServiceClient.UpdateInterface(ctx, req.(*grpc.UpdateInterfaceRequest))
} }
k.reqHandlers[grpcListInterfacesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcListInterfacesRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.ListInterfaces(ctx, req.(*grpc.ListInterfacesRequest), opts...) return k.client.AgentServiceClient.ListInterfaces(ctx, req.(*grpc.ListInterfacesRequest))
} }
k.reqHandlers[grpcListRoutesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcListRoutesRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.ListRoutes(ctx, req.(*grpc.ListRoutesRequest), opts...) return k.client.AgentServiceClient.ListRoutes(ctx, req.(*grpc.ListRoutesRequest))
} }
k.reqHandlers[grpcOnlineCPUMemRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcOnlineCPUMemRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.OnlineCPUMem(ctx, req.(*grpc.OnlineCPUMemRequest), opts...) return k.client.AgentServiceClient.OnlineCPUMem(ctx, req.(*grpc.OnlineCPUMemRequest))
} }
k.reqHandlers[grpcListProcessesRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcListProcessesRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.ListProcesses(ctx, req.(*grpc.ListProcessesRequest), opts...) return k.client.AgentServiceClient.ListProcesses(ctx, req.(*grpc.ListProcessesRequest))
} }
k.reqHandlers[grpcUpdateContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcUpdateContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.UpdateContainer(ctx, req.(*grpc.UpdateContainerRequest), opts...) return k.client.AgentServiceClient.UpdateContainer(ctx, req.(*grpc.UpdateContainerRequest))
} }
k.reqHandlers[grpcWaitProcessRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcWaitProcessRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.WaitProcess(ctx, req.(*grpc.WaitProcessRequest), opts...) return k.client.AgentServiceClient.WaitProcess(ctx, req.(*grpc.WaitProcessRequest))
} }
k.reqHandlers[grpcTtyWinResizeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcTtyWinResizeRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.TtyWinResize(ctx, req.(*grpc.TtyWinResizeRequest), opts...) return k.client.AgentServiceClient.TtyWinResize(ctx, req.(*grpc.TtyWinResizeRequest))
} }
k.reqHandlers[grpcWriteStreamRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcWriteStreamRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.WriteStdin(ctx, req.(*grpc.WriteStreamRequest), opts...) return k.client.AgentServiceClient.WriteStdin(ctx, req.(*grpc.WriteStreamRequest))
} }
k.reqHandlers[grpcCloseStdinRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcCloseStdinRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.CloseStdin(ctx, req.(*grpc.CloseStdinRequest), opts...) return k.client.AgentServiceClient.CloseStdin(ctx, req.(*grpc.CloseStdinRequest))
} }
k.reqHandlers[grpcStatsContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcStatsContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.StatsContainer(ctx, req.(*grpc.StatsContainerRequest), opts...) return k.client.AgentServiceClient.StatsContainer(ctx, req.(*grpc.StatsContainerRequest))
} }
k.reqHandlers[grpcPauseContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcPauseContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.PauseContainer(ctx, req.(*grpc.PauseContainerRequest), opts...) return k.client.AgentServiceClient.PauseContainer(ctx, req.(*grpc.PauseContainerRequest))
} }
k.reqHandlers[grpcResumeContainerRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcResumeContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.ResumeContainer(ctx, req.(*grpc.ResumeContainerRequest), opts...) return k.client.AgentServiceClient.ResumeContainer(ctx, req.(*grpc.ResumeContainerRequest))
} }
k.reqHandlers[grpcReseedRandomDevRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcReseedRandomDevRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.ReseedRandomDev(ctx, req.(*grpc.ReseedRandomDevRequest), opts...) return k.client.AgentServiceClient.ReseedRandomDev(ctx, req.(*grpc.ReseedRandomDevRequest))
} }
k.reqHandlers[grpcGuestDetailsRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcGuestDetailsRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.GetGuestDetails(ctx, req.(*grpc.GuestDetailsRequest), opts...) return k.client.AgentServiceClient.GetGuestDetails(ctx, req.(*grpc.GuestDetailsRequest))
} }
k.reqHandlers[grpcMemHotplugByProbeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcMemHotplugByProbeRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.MemHotplugByProbe(ctx, req.(*grpc.MemHotplugByProbeRequest), opts...) return k.client.AgentServiceClient.MemHotplugByProbe(ctx, req.(*grpc.MemHotplugByProbeRequest))
} }
k.reqHandlers[grpcCopyFileRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcCopyFileRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.CopyFile(ctx, req.(*grpc.CopyFileRequest), opts...) return k.client.AgentServiceClient.CopyFile(ctx, req.(*grpc.CopyFileRequest))
} }
k.reqHandlers[grpcSetGuestDateTimeRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcSetGuestDateTimeRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.SetGuestDateTime(ctx, req.(*grpc.SetGuestDateTimeRequest), opts...) return k.client.AgentServiceClient.SetGuestDateTime(ctx, req.(*grpc.SetGuestDateTimeRequest))
} }
k.reqHandlers[grpcStartTracingRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcStartTracingRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.StartTracing(ctx, req.(*grpc.StartTracingRequest), opts...) return k.client.AgentServiceClient.StartTracing(ctx, req.(*grpc.StartTracingRequest))
} }
k.reqHandlers[grpcStopTracingRequest] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers[grpcStopTracingRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.StopTracing(ctx, req.(*grpc.StopTracingRequest), opts...) return k.client.AgentServiceClient.StopTracing(ctx, req.(*grpc.StopTracingRequest))
} }
} }
@@ -2041,7 +2040,7 @@ func (k *kataAgent) readProcessStdout(c *Container, processID string, data []byt
defer k.disconnect() defer k.disconnect()
} }
return k.readProcessStream(c.id, processID, data, k.client.ReadStdout) return k.readProcessStream(c.id, processID, data, k.client.AgentServiceClient.ReadStdout)
} }
// readStdout and readStderr are special that we cannot differentiate them with the request types... // readStdout and readStderr are special that we cannot differentiate them with the request types...
@@ -2053,10 +2052,10 @@ func (k *kataAgent) readProcessStderr(c *Container, processID string, data []byt
defer k.disconnect() defer k.disconnect()
} }
return k.readProcessStream(c.id, processID, data, k.client.ReadStderr) return k.readProcessStream(c.id, processID, data, k.client.AgentServiceClient.ReadStderr)
} }
type readFn func(context.Context, *grpc.ReadStreamRequest, ...golangGrpc.CallOption) (*grpc.ReadStreamResponse, error) type readFn func(context.Context, *grpc.ReadStreamRequest) (*grpc.ReadStreamResponse, error)
func (k *kataAgent) readProcessStream(containerID, processID string, data []byte, read readFn) (int, error) { func (k *kataAgent) readProcessStream(containerID, processID string, data []byte, read readFn) (int, error) {
resp, err := read(k.ctx, &grpc.ReadStreamRequest{ resp, err := read(k.ctx, &grpc.ReadStreamRequest{

View File

@@ -20,10 +20,10 @@ import (
vcAnnotations "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/annotations" vcAnnotations "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/annotations"
"github.com/containerd/ttrpc"
gpb "github.com/gogo/protobuf/types" gpb "github.com/gogo/protobuf/types"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"google.golang.org/grpc"
aTypes "github.com/kata-containers/agent/pkg/types" aTypes "github.com/kata-containers/agent/pkg/types"
pb "github.com/kata-containers/agent/protocols/grpc" pb "github.com/kata-containers/agent/protocols/grpc"
@@ -242,11 +242,11 @@ func (p *gRPCProxy) MemHotplugByProbe(ctx context.Context, req *pb.MemHotplugByP
return &gpb.Empty{}, nil return &gpb.Empty{}, nil
} }
func gRPCRegister(s *grpc.Server, srv interface{}) { func gRPCRegister(s *ttrpc.Server, srv interface{}) {
switch g := srv.(type) { switch g := srv.(type) {
case *gRPCProxy: case *gRPCProxy:
pb.RegisterAgentServiceServer(s, g) pb.RegisterAgentServiceService(s, g)
pb.RegisterHealthServer(s, g) pb.RegisterHealthService(s, g)
} }
} }
@@ -358,7 +358,7 @@ func TestHandleEphemeralStorage(t *testing.T) {
ociMounts = append(ociMounts, mount) ociMounts = append(ociMounts, mount)
epheStorages := k.handleEphemeralStorage(ociMounts) epheStorages := k.handleEphemeralStorage(ociMounts)
epheMountPoint := epheStorages[0].GetMountPoint() epheMountPoint := epheStorages[0].MountPoint
expected := filepath.Join(ephemeralPath(), filepath.Base(mountSource)) expected := filepath.Join(ephemeralPath(), filepath.Base(mountSource))
assert.Equal(t, epheMountPoint, expected, assert.Equal(t, epheMountPoint, expected,
"Ephemeral mount point didn't match: got %s, expecting %s", epheMountPoint, expected) "Ephemeral mount point didn't match: got %s, expecting %s", epheMountPoint, expected)
@@ -383,7 +383,7 @@ func TestHandleLocalStorage(t *testing.T) {
assert.NotNil(t, localStorages) assert.NotNil(t, localStorages)
assert.Equal(t, len(localStorages), 1) assert.Equal(t, len(localStorages), 1)
localMountPoint := localStorages[0].GetMountPoint() localMountPoint := localStorages[0].MountPoint
expected := filepath.Join(kataGuestSharedDir(), sandboxID, rootfsSuffix, KataLocalDevType, filepath.Base(mountSource)) expected := filepath.Join(kataGuestSharedDir(), sandboxID, rootfsSuffix, KataLocalDevType, filepath.Base(mountSource))
assert.Equal(t, localMountPoint, expected) assert.Equal(t, localMountPoint, expected)
} }

View File

@@ -6,6 +6,7 @@
package mock package mock
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@@ -14,7 +15,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"google.golang.org/grpc" "github.com/containerd/ttrpc"
) )
// DefaultMockKataShimBinPath is populated at link time. // DefaultMockKataShimBinPath is populated at link time.
@@ -122,7 +123,7 @@ type ProxyGRPCMock struct {
// GRPCRegister is the registration routine for // GRPCRegister is the registration routine for
// the GRPC service. // the GRPC service.
GRPCRegister func(s *grpc.Server, srv interface{}) GRPCRegister func(s *ttrpc.Server, srv interface{})
listener net.Listener listener net.Listener
} }
@@ -194,11 +195,14 @@ func (p *ProxyGRPCMock) Start(URL string) error {
p.listener = l p.listener = l
grpcServer := grpc.NewServer() grpcServer, err := ttrpc.NewServer()
if err != nil {
return err
}
p.GRPCRegister(grpcServer, p.GRPCImplementer) p.GRPCRegister(grpcServer, p.GRPCImplementer)
go func() { go func() {
grpcServer.Serve(l) grpcServer.Serve(context.Background(), l)
}() }()
return nil return nil