1
0
mirror of https://github.com/haiwen/libsearpc.git synced 2025-07-30 20:44:40 +00:00

pysearpc: implemented named pipe client (and server, for testing)

This commit is contained in:
Shuai Lin 2018-08-09 17:34:29 +08:00
parent 0bf8150137
commit 13f186c90d
6 changed files with 262 additions and 30 deletions

View File

@ -1,4 +1,5 @@
from .common import SearpcError
from .client import SearpcClient, searpc_func, SearpcObjEncoder
from .server import searpc_server
from .transport import SearpcTransport, NamedPipeTransport
from .transport import SearpcTransport
from .named_pipe import NamedPipeServer, NamedPipeClient

7
pysearpc/errors.py Normal file
View File

@ -0,0 +1,7 @@
class NetworkError(Exception):
def __init__(self, msg):
Exception.__init__(self)
self.msg = msg
def __str__(self):
return self.msg

150
pysearpc/named_pipe.py Normal file
View File

@ -0,0 +1,150 @@
"""
RPC client/server implementation based on named pipe transport.
"""
import json
import logging
import os
import socket
import struct
from threading import Thread
from .client import SearpcClient
from .server import searpc_server
from .transport import SearpcTransport
from .utils import make_socket_closeonexec, recvall, sendall
logger = logging.getLogger(__name__)
class NamedPipeException(Exception):
pass
class NamedPipeTransport(SearpcTransport):
"""
This transport uses named pipes on windows and unix domain socket
on linux/mac.
It's compatible with the c implementation of named pipe transport.
in lib/searpc-named-pipe-transport.[ch] files.
The protocol is:
- request: <32b length header><json request>
- response: <32b length header><json response>
"""
def __init__(self, socket_path):
self.socket_path = socket_path
self.pipe_fd = None
def connect(self):
self.pipe_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
make_socket_closeonexec(self.pipe_fd)
self.pipe_fd.connect(self.socket_path)
def stop(self):
if self.pipe_fd:
self.pipe_fd.close()
self.pipe_fd = None
def send(self, service, fcall_str):
body = json.dumps({
'service': service,
'request': fcall_str,
})
# "I" for unsiged int
header = struct.pack('I', len(body))
sendall(self.pipe_fd, header)
sendall(self.pipe_fd, body)
resp_header = recvall(self.pipe_fd, 4)
# logger.info('resp_header is %s', resp_header)
resp_size, = struct.unpack('I', resp_header)
# logger.info('resp_size is %s', resp_size)
resp = recvall(self.pipe_fd, resp_size)
# logger.info('resp is %s', resp)
return resp
class NamedPipeClient(SearpcClient):
def __init__(self, socket_path, service_name):
self.socket_path = socket_path
self.service_name = service_name
self.transport = NamedPipeTransport(socket_path)
self.connected = False
def stop(self):
self.transport.stop()
def call_remote_func_sync(self, fcall_str):
if not self.connected:
self.transport.connect()
self.connected = True
return self.transport.send(self.service_name, fcall_str)
class NamedPipeServer(object):
"""
Searpc server based on named pipe transport. Note this server is
very basic and is written for testing purpose only.
"""
def __init__(self, socket_path):
self.socket_path = socket_path
self.pipe_fd = None
self.thread = Thread(target=self.accept_loop)
self.thread.setDaemon(True)
def start(self):
self.init_socket()
self.thread.start()
def stop(self):
pass
def init_socket(self):
if os.path.exists(self.socket_path):
try:
os.unlink(self.socket_path)
except OSError:
raise NamedPipeException(
'Failed to remove existing unix socket {}'.
format(self.socket_path)
)
self.pipe_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
make_socket_closeonexec(self.pipe_fd)
self.pipe_fd.bind(self.socket_path)
self.pipe_fd.listen(10)
logger.info('Server now listening at %s', self.socket_path)
def accept_loop(self):
logger.info('Waiting for clients')
while True:
connfd, _ = self.pipe_fd.accept()
logger.info('New pip client')
t = PipeHandlerThread(connfd)
t.start()
class PipeHandlerThread(Thread):
def __init__(self, pipe_fd):
Thread.__init__(self)
self.setDaemon(True)
self.pipe_fd = pipe_fd
def run(self):
while True:
req_header = recvall(self.pipe_fd, 4)
# logger.info('Got req header %s', req_header)
req_size, = struct.unpack('I', req_header)
# logger.info('req size is %s', req_size)
req = recvall(self.pipe_fd, req_size)
# logger.info('req is %s', req)
data = json.loads(req)
resp = searpc_server.call_function(data['service'], data['request'])
# logger.info('resp is %s', resp)
resp_header = struct.pack('I', len(resp))
sendall(self.pipe_fd, resp_header)
sendall(self.pipe_fd, resp)

View File

@ -2,6 +2,7 @@
#coding: UTF-8
import json
import logging
import os
import sys
import unittest
@ -10,11 +11,13 @@ from operator import add, mul
os.chdir(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, '..')
from pysearpc import (
SearpcClient, SearpcError, SearpcTransport, searpc_func, searpc_server, NamedPipeTransport
NamedPipeClient, NamedPipeServer, SearpcClient, SearpcError,
SearpcTransport, searpc_func, searpc_server
)
SVCNAME = 'test-service'
def init_server():
searpc_server.create_service(SVCNAME)
searpc_server.register_function(SVCNAME, add, 'add')
@ -22,18 +25,13 @@ def init_server():
class DummyTransport(SearpcTransport):
def send(self, fcall_str):
return searpc_server.call_function(SVCNAME, fcall_str)
def connect(self):
pass
def send(self, service, fcall_str):
return searpc_server.call_function(service, fcall_str)
class SampleRpcClient(SearpcClient):
def __init__(self):
self.transport = DummyTransport()
def call_remote_func_sync(self, fcall_str):
return self.transport.send(fcall_str)
class RpcMixin(object):
@searpc_func("int", ["int", "int"])
def add(self, x, y):
pass
@ -42,28 +40,64 @@ class SampleRpcClient(SearpcClient):
def multi(self, x, y):
pass
class DummyRpcClient(SearpcClient, RpcMixin):
def __init__(self):
self.transport = DummyTransport()
def call_remote_func_sync(self, fcall_str):
return self.transport.send(SVCNAME, fcall_str)
class NamedPipeClientForTest(NamedPipeClient, RpcMixin):
pass
SOCKET_PATH = '/tmp/libsearpc-test.sock'
class SearpcTest(unittest.TestCase):
def setUp(self):
@classmethod
def setUpClass(cls):
init_server()
self.client = SampleRpcClient()
cls.client = DummyRpcClient()
cls.named_pipe_server = NamedPipeServer(SOCKET_PATH)
cls.named_pipe_server.start()
cls.named_pipe_client = NamedPipeClientForTest(SOCKET_PATH, SVCNAME)
@classmethod
def tearDownClass(cls):
cls.named_pipe_client.stop()
cls.named_pipe_server.stop()
def test_normal_transport(self):
self.run_common()
self.run_common(self.client)
@unittest.skip('not implemented yet')
# @unittest.skip('not implemented yet')
def test_pipe_transport(self):
self.client.transport = NamedPipeTransport('/tmp/libsearpc-test.sock')
self.run_common()
self.run_common(self.named_pipe_client)
def run_common(self):
v = self.client.add(1, 2)
def run_common(self, client):
v = client.add(1, 2)
self.assertEqual(v, 3)
v = self.client.multi(1, 2)
v = client.multi(1, 2)
self.assertEqual(v, 2)
v = self.client.multi('abc', 2)
v = client.multi('abc', 2)
self.assertEqual(v, 'abcabc')
def setup_logging(level=logging.INFO):
kw = {
# 'format': '[%(asctime)s][%(pathname)s]: %(message)s',
'format': '[%(asctime)s][%(module)s]: %(message)s',
'datefmt': '%m/%d/%Y %H:%M:%S',
'level': level,
'stream': sys.stdout
}
logging.basicConfig(**kw)
if __name__ == '__main__':
setup_logging()
unittest.main()

View File

@ -4,13 +4,8 @@ class SearpcTransport(object):
A transport is repsonsible to send the serialized request to the
server, and get back the raw response from the server.
"""
def send(self, request_str):
def connect(self):
raise NotImplementedError
class NamedPipeTransport(SearpcTransport):
def __init__(self, pipe_path):
self.pipe_path = pipe_path
def send(self, fcall_str):
pass
def send(self, service_name, request_str):
raise NotImplementedError

45
pysearpc/utils.py Normal file
View File

@ -0,0 +1,45 @@
import os
import socket
from pysearpc.errors import NetworkError
def recvall(fd, total):
remain = total
data = ''
while remain > 0:
try:
new = fd.recv(remain)
except socket.error as e:
raise NetworkError('Failed to read from socket: %s' % e)
n = len(new)
if n <= 0:
raise NetworkError("Failed to read from socket")
else:
data += new
remain -= n
return data
def sendall(fd, data):
total = len(data)
offset = 0
while offset < total:
try:
n = fd.send(data[offset:])
except socket.error as e:
raise NetworkError('Failed to write to socket: %s' % e)
if n <= 0:
raise NetworkError('Failed to write to socket')
else:
offset += n
def is_win32():
return os.name == 'nt'
def make_socket_closeonexec(fd):
if not is_win32():
import fcntl
old_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)