1
0
mirror of https://github.com/haiwen/libsearpc.git synced 2025-09-17 23:37:46 +00:00

Merge pull request #32 from haiwen/named-pipe-python

pysearpc: added named pipe client
This commit is contained in:
Jiaqiang Xu
2018-08-14 18:22:18 +08:00
committed by GitHub
7 changed files with 318 additions and 16 deletions

View File

@@ -1,5 +1,5 @@
sudo: false
language: c
language: python
compiler:
- gcc
- clang
@@ -12,7 +12,8 @@ before_install:
- ./autogen.sh
script:
- ./configure
- make -j8
- make check -j8
- make -j4
- make check -j4
- python pysearpc/test_pysearpc.py
notifications:
email: false

View File

@@ -1,3 +1,5 @@
from common import SearpcError
from client import SearpcClient, searpc_func, SearpcObjEncoder
from server import searpc_server
from .common import SearpcError
from .client import SearpcClient, searpc_func, SearpcObjEncoder
from .server import searpc_server
from .transport import SearpcTransport
from .named_pipe import NamedPipeServer, NamedPipeClient

7
pysearpc/errors.py Normal file
View File

@@ -0,0 +1,7 @@
class NetworkError(Exception):
def __init__(self, msg):
Exception.__init__(self)
self.msg = msg
def __str__(self):
return self.msg

150
pysearpc/named_pipe.py Normal file
View File

@@ -0,0 +1,150 @@
"""
RPC client/server implementation based on named pipe transport.
"""
import json
import logging
import os
import socket
import struct
from threading import Thread
from .client import SearpcClient
from .server import searpc_server
from .transport import SearpcTransport
from .utils import make_socket_closeonexec, recvall, sendall
logger = logging.getLogger(__name__)
class NamedPipeException(Exception):
pass
class NamedPipeTransport(SearpcTransport):
"""
This transport uses named pipes on windows and unix domain socket
on linux/mac.
It's compatible with the c implementation of named pipe transport.
in lib/searpc-named-pipe-transport.[ch] files.
The protocol is:
- request: <32b length header><json request>
- response: <32b length header><json response>
"""
def __init__(self, socket_path):
self.socket_path = socket_path
self.pipe_fd = None
def connect(self):
self.pipe_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
make_socket_closeonexec(self.pipe_fd)
self.pipe_fd.connect(self.socket_path)
def stop(self):
if self.pipe_fd:
self.pipe_fd.close()
self.pipe_fd = None
def send(self, service, fcall_str):
body = json.dumps({
'service': service,
'request': fcall_str,
})
# "I" for unsiged int
header = struct.pack('I', len(body))
sendall(self.pipe_fd, header)
sendall(self.pipe_fd, body)
resp_header = recvall(self.pipe_fd, 4)
# logger.info('resp_header is %s', resp_header)
resp_size, = struct.unpack('I', resp_header)
# logger.info('resp_size is %s', resp_size)
resp = recvall(self.pipe_fd, resp_size)
# logger.info('resp is %s', resp)
return resp
class NamedPipeClient(SearpcClient):
def __init__(self, socket_path, service_name):
self.socket_path = socket_path
self.service_name = service_name
self.transport = NamedPipeTransport(socket_path)
self.connected = False
def stop(self):
self.transport.stop()
def call_remote_func_sync(self, fcall_str):
if not self.connected:
self.transport.connect()
self.connected = True
return self.transport.send(self.service_name, fcall_str)
class NamedPipeServer(object):
"""
Searpc server based on named pipe transport. Note this server is
very basic and is written for testing purpose only.
"""
def __init__(self, socket_path):
self.socket_path = socket_path
self.pipe_fd = None
self.thread = Thread(target=self.accept_loop)
self.thread.setDaemon(True)
def start(self):
self.init_socket()
self.thread.start()
def stop(self):
pass
def init_socket(self):
if os.path.exists(self.socket_path):
try:
os.unlink(self.socket_path)
except OSError:
raise NamedPipeException(
'Failed to remove existing unix socket {}'.
format(self.socket_path)
)
self.pipe_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
make_socket_closeonexec(self.pipe_fd)
self.pipe_fd.bind(self.socket_path)
self.pipe_fd.listen(10)
logger.info('Server now listening at %s', self.socket_path)
def accept_loop(self):
logger.info('Waiting for clients')
while True:
connfd, _ = self.pipe_fd.accept()
logger.info('New pip client')
t = PipeHandlerThread(connfd)
t.start()
class PipeHandlerThread(Thread):
def __init__(self, pipe_fd):
Thread.__init__(self)
self.setDaemon(True)
self.pipe_fd = pipe_fd
def run(self):
while True:
req_header = recvall(self.pipe_fd, 4)
# logger.info('Got req header %s', req_header)
req_size, = struct.unpack('I', req_header)
# logger.info('req size is %s', req_size)
req = recvall(self.pipe_fd, req_size)
# logger.info('req is %s', req)
data = json.loads(req)
resp = searpc_server.call_function(data['service'], data['request'])
# logger.info('resp is %s', resp)
resp_header = struct.pack('I', len(resp))
sendall(self.pipe_fd, resp_header)
sendall(self.pipe_fd, resp)

106
pysearpc/test_pysearpc.py Normal file → Executable file
View File

@@ -1,17 +1,103 @@
#!/usr/bin/env python
#coding: UTF-8
import json
import logging
import os
import sys
sys.path += ['..']
import unittest
from operator import add, mul
from pysearpc import SearpcClient, searpc_func, SearpcError
os.chdir(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, '..')
from pysearpc import (
NamedPipeClient, NamedPipeServer, SearpcClient, SearpcError,
SearpcTransport, searpc_func, searpc_server
)
class SampleRpcClient(SearpcClient):
def call_remote_func_sync(self, fcall_str):
return ""
SVCNAME = 'test-service'
@searpc_func("void", ["string", "int"])
def list_peers(self):
def init_server():
searpc_server.create_service(SVCNAME)
searpc_server.register_function(SVCNAME, add, 'add')
searpc_server.register_function(SVCNAME, mul, 'multi')
class DummyTransport(SearpcTransport):
def connect(self):
pass
client = SampleRpcClient()
client.list_peers("id", 10)
def send(self, service, fcall_str):
return searpc_server.call_function(service, fcall_str)
class RpcMixin(object):
@searpc_func("int", ["int", "int"])
def add(self, x, y):
pass
@searpc_func("string", ["string", "int"])
def multi(self, x, y):
pass
class DummyRpcClient(SearpcClient, RpcMixin):
def __init__(self):
self.transport = DummyTransport()
def call_remote_func_sync(self, fcall_str):
return self.transport.send(SVCNAME, fcall_str)
class NamedPipeClientForTest(NamedPipeClient, RpcMixin):
pass
SOCKET_PATH = '/tmp/libsearpc-test.sock'
class SearpcTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
init_server()
cls.client = DummyRpcClient()
cls.named_pipe_server = NamedPipeServer(SOCKET_PATH)
cls.named_pipe_server.start()
cls.named_pipe_client = NamedPipeClientForTest(SOCKET_PATH, SVCNAME)
@classmethod
def tearDownClass(cls):
cls.named_pipe_client.stop()
cls.named_pipe_server.stop()
def test_normal_transport(self):
self.run_common(self.client)
# @unittest.skip('not implemented yet')
def test_pipe_transport(self):
self.run_common(self.named_pipe_client)
def run_common(self, client):
v = client.add(1, 2)
self.assertEqual(v, 3)
v = client.multi(1, 2)
self.assertEqual(v, 2)
v = client.multi('abc', 2)
self.assertEqual(v, 'abcabc')
def setup_logging(level=logging.INFO):
kw = {
# 'format': '[%(asctime)s][%(pathname)s]: %(message)s',
'format': '[%(asctime)s][%(module)s]: %(message)s',
'datefmt': '%m/%d/%Y %H:%M:%S',
'level': level,
'stream': sys.stdout
}
logging.basicConfig(**kw)
if __name__ == '__main__':
setup_logging()
unittest.main()

11
pysearpc/transport.py Normal file
View File

@@ -0,0 +1,11 @@
class SearpcTransport(object):
"""
A transport is repsonsible to send the serialized request to the
server, and get back the raw response from the server.
"""
def connect(self):
raise NotImplementedError
def send(self, service_name, request_str):
raise NotImplementedError

45
pysearpc/utils.py Normal file
View File

@@ -0,0 +1,45 @@
import os
import socket
from pysearpc.errors import NetworkError
def recvall(fd, total):
remain = total
data = ''
while remain > 0:
try:
new = fd.recv(remain)
except socket.error as e:
raise NetworkError('Failed to read from socket: %s' % e)
n = len(new)
if n <= 0:
raise NetworkError("Failed to read from socket")
else:
data += new
remain -= n
return data
def sendall(fd, data):
total = len(data)
offset = 0
while offset < total:
try:
n = fd.send(data[offset:])
except socket.error as e:
raise NetworkError('Failed to write to socket: %s' % e)
if n <= 0:
raise NetworkError('Failed to write to socket')
else:
offset += n
def is_win32():
return os.name == 'nt'
def make_socket_closeonexec(fd):
if not is_win32():
import fcntl
old_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)