diff --git a/pysearpc/__init__.py b/pysearpc/__init__.py index af60bd6..984d31a 100644 --- a/pysearpc/__init__.py +++ b/pysearpc/__init__.py @@ -1,4 +1,5 @@ from .common import SearpcError from .client import SearpcClient, searpc_func, SearpcObjEncoder from .server import searpc_server -from .transport import SearpcTransport, NamedPipeTransport +from .transport import SearpcTransport +from .named_pipe import NamedPipeServer, NamedPipeClient diff --git a/pysearpc/errors.py b/pysearpc/errors.py new file mode 100644 index 0000000..f7caa70 --- /dev/null +++ b/pysearpc/errors.py @@ -0,0 +1,7 @@ +class NetworkError(Exception): + def __init__(self, msg): + Exception.__init__(self) + self.msg = msg + + def __str__(self): + return self.msg diff --git a/pysearpc/named_pipe.py b/pysearpc/named_pipe.py new file mode 100644 index 0000000..7d62498 --- /dev/null +++ b/pysearpc/named_pipe.py @@ -0,0 +1,150 @@ +""" +RPC client/server implementation based on named pipe transport. +""" + +import json +import logging +import os +import socket +import struct +from threading import Thread + +from .client import SearpcClient +from .server import searpc_server +from .transport import SearpcTransport +from .utils import make_socket_closeonexec, recvall, sendall + +logger = logging.getLogger(__name__) + + +class NamedPipeException(Exception): + pass + + +class NamedPipeTransport(SearpcTransport): + """ + This transport uses named pipes on windows and unix domain socket + on linux/mac. + + It's compatible with the c implementation of named pipe transport. + in lib/searpc-named-pipe-transport.[ch] files. + + The protocol is: + - request: <32b length header> + - response: <32b length header> + """ + + def __init__(self, socket_path): + self.socket_path = socket_path + self.pipe_fd = None + + def connect(self): + self.pipe_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) + make_socket_closeonexec(self.pipe_fd) + self.pipe_fd.connect(self.socket_path) + + def stop(self): + if self.pipe_fd: + self.pipe_fd.close() + self.pipe_fd = None + + def send(self, service, fcall_str): + body = json.dumps({ + 'service': service, + 'request': fcall_str, + }) + # "I" for unsiged int + header = struct.pack('I', len(body)) + sendall(self.pipe_fd, header) + sendall(self.pipe_fd, body) + + resp_header = recvall(self.pipe_fd, 4) + # logger.info('resp_header is %s', resp_header) + resp_size, = struct.unpack('I', resp_header) + # logger.info('resp_size is %s', resp_size) + resp = recvall(self.pipe_fd, resp_size) + # logger.info('resp is %s', resp) + return resp + + +class NamedPipeClient(SearpcClient): + def __init__(self, socket_path, service_name): + self.socket_path = socket_path + self.service_name = service_name + self.transport = NamedPipeTransport(socket_path) + self.connected = False + + def stop(self): + self.transport.stop() + + def call_remote_func_sync(self, fcall_str): + if not self.connected: + self.transport.connect() + self.connected = True + return self.transport.send(self.service_name, fcall_str) + + +class NamedPipeServer(object): + """ + Searpc server based on named pipe transport. Note this server is + very basic and is written for testing purpose only. + """ + def __init__(self, socket_path): + self.socket_path = socket_path + self.pipe_fd = None + self.thread = Thread(target=self.accept_loop) + self.thread.setDaemon(True) + + def start(self): + self.init_socket() + self.thread.start() + + def stop(self): + pass + + def init_socket(self): + if os.path.exists(self.socket_path): + try: + os.unlink(self.socket_path) + except OSError: + raise NamedPipeException( + 'Failed to remove existing unix socket {}'. + format(self.socket_path) + ) + self.pipe_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) + make_socket_closeonexec(self.pipe_fd) + self.pipe_fd.bind(self.socket_path) + self.pipe_fd.listen(10) + logger.info('Server now listening at %s', self.socket_path) + + def accept_loop(self): + logger.info('Waiting for clients') + while True: + connfd, _ = self.pipe_fd.accept() + logger.info('New pip client') + t = PipeHandlerThread(connfd) + t.start() + + +class PipeHandlerThread(Thread): + def __init__(self, pipe_fd): + Thread.__init__(self) + self.setDaemon(True) + self.pipe_fd = pipe_fd + + def run(self): + while True: + req_header = recvall(self.pipe_fd, 4) + # logger.info('Got req header %s', req_header) + req_size, = struct.unpack('I', req_header) + # logger.info('req size is %s', req_size) + req = recvall(self.pipe_fd, req_size) + # logger.info('req is %s', req) + + data = json.loads(req) + resp = searpc_server.call_function(data['service'], data['request']) + # logger.info('resp is %s', resp) + + resp_header = struct.pack('I', len(resp)) + sendall(self.pipe_fd, resp_header) + sendall(self.pipe_fd, resp) diff --git a/pysearpc/test_pysearpc.py b/pysearpc/test_pysearpc.py index d1deb8f..cc1e236 100755 --- a/pysearpc/test_pysearpc.py +++ b/pysearpc/test_pysearpc.py @@ -2,6 +2,7 @@ #coding: UTF-8 import json +import logging import os import sys import unittest @@ -10,11 +11,13 @@ from operator import add, mul os.chdir(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, '..') from pysearpc import ( - SearpcClient, SearpcError, SearpcTransport, searpc_func, searpc_server, NamedPipeTransport + NamedPipeClient, NamedPipeServer, SearpcClient, SearpcError, + SearpcTransport, searpc_func, searpc_server ) SVCNAME = 'test-service' + def init_server(): searpc_server.create_service(SVCNAME) searpc_server.register_function(SVCNAME, add, 'add') @@ -22,18 +25,13 @@ def init_server(): class DummyTransport(SearpcTransport): - def send(self, fcall_str): - return searpc_server.call_function(SVCNAME, fcall_str) + def connect(self): + pass + def send(self, service, fcall_str): + return searpc_server.call_function(service, fcall_str) -class SampleRpcClient(SearpcClient): - - def __init__(self): - self.transport = DummyTransport() - - def call_remote_func_sync(self, fcall_str): - return self.transport.send(fcall_str) - +class RpcMixin(object): @searpc_func("int", ["int", "int"]) def add(self, x, y): pass @@ -42,28 +40,64 @@ class SampleRpcClient(SearpcClient): def multi(self, x, y): pass +class DummyRpcClient(SearpcClient, RpcMixin): + def __init__(self): + self.transport = DummyTransport() + + def call_remote_func_sync(self, fcall_str): + return self.transport.send(SVCNAME, fcall_str) + +class NamedPipeClientForTest(NamedPipeClient, RpcMixin): + pass + + +SOCKET_PATH = '/tmp/libsearpc-test.sock' + + class SearpcTest(unittest.TestCase): - def setUp(self): + @classmethod + def setUpClass(cls): init_server() - self.client = SampleRpcClient() + cls.client = DummyRpcClient() + + cls.named_pipe_server = NamedPipeServer(SOCKET_PATH) + cls.named_pipe_server.start() + cls.named_pipe_client = NamedPipeClientForTest(SOCKET_PATH, SVCNAME) + + @classmethod + def tearDownClass(cls): + cls.named_pipe_client.stop() + cls.named_pipe_server.stop() def test_normal_transport(self): - self.run_common() + self.run_common(self.client) - @unittest.skip('not implemented yet') + # @unittest.skip('not implemented yet') def test_pipe_transport(self): - self.client.transport = NamedPipeTransport('/tmp/libsearpc-test.sock') - self.run_common() + self.run_common(self.named_pipe_client) - def run_common(self): - v = self.client.add(1, 2) + def run_common(self, client): + v = client.add(1, 2) self.assertEqual(v, 3) - v = self.client.multi(1, 2) + v = client.multi(1, 2) self.assertEqual(v, 2) - v = self.client.multi('abc', 2) + v = client.multi('abc', 2) self.assertEqual(v, 'abcabc') +def setup_logging(level=logging.INFO): + kw = { + # 'format': '[%(asctime)s][%(pathname)s]: %(message)s', + 'format': '[%(asctime)s][%(module)s]: %(message)s', + 'datefmt': '%m/%d/%Y %H:%M:%S', + 'level': level, + 'stream': sys.stdout + } + + logging.basicConfig(**kw) + + if __name__ == '__main__': + setup_logging() unittest.main() diff --git a/pysearpc/transport.py b/pysearpc/transport.py index 0fe4a4a..60285cb 100644 --- a/pysearpc/transport.py +++ b/pysearpc/transport.py @@ -4,13 +4,8 @@ class SearpcTransport(object): A transport is repsonsible to send the serialized request to the server, and get back the raw response from the server. """ - def send(self, request_str): + def connect(self): raise NotImplementedError - -class NamedPipeTransport(SearpcTransport): - def __init__(self, pipe_path): - self.pipe_path = pipe_path - - def send(self, fcall_str): - pass + def send(self, service_name, request_str): + raise NotImplementedError diff --git a/pysearpc/utils.py b/pysearpc/utils.py new file mode 100644 index 0000000..7c3739b --- /dev/null +++ b/pysearpc/utils.py @@ -0,0 +1,45 @@ +import os +import socket + +from pysearpc.errors import NetworkError + +def recvall(fd, total): + remain = total + data = '' + while remain > 0: + try: + new = fd.recv(remain) + except socket.error as e: + raise NetworkError('Failed to read from socket: %s' % e) + + n = len(new) + if n <= 0: + raise NetworkError("Failed to read from socket") + else: + data += new + remain -= n + + return data + +def sendall(fd, data): + total = len(data) + offset = 0 + while offset < total: + try: + n = fd.send(data[offset:]) + except socket.error as e: + raise NetworkError('Failed to write to socket: %s' % e) + + if n <= 0: + raise NetworkError('Failed to write to socket') + else: + offset += n + +def is_win32(): + return os.name == 'nt' + +def make_socket_closeonexec(fd): + if not is_win32(): + import fcntl + old_flags = fcntl.fcntl(fd, fcntl.F_GETFD) + fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)