2018-08-09 09:34:29 +00:00
|
|
|
"""
|
|
|
|
RPC client/server implementation based on named pipe transport.
|
|
|
|
"""
|
|
|
|
|
|
|
|
import json
|
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
import socket
|
|
|
|
import struct
|
|
|
|
from threading import Thread
|
2019-06-25 08:42:13 +00:00
|
|
|
import queue
|
2018-08-09 09:34:29 +00:00
|
|
|
|
|
|
|
from .client import SearpcClient
|
|
|
|
from .server import searpc_server
|
|
|
|
from .transport import SearpcTransport
|
|
|
|
from .utils import make_socket_closeonexec, recvall, sendall
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class NamedPipeException(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class NamedPipeTransport(SearpcTransport):
|
|
|
|
"""
|
|
|
|
This transport uses named pipes on windows and unix domain socket
|
|
|
|
on linux/mac.
|
|
|
|
|
|
|
|
It's compatible with the c implementation of named pipe transport.
|
|
|
|
in lib/searpc-named-pipe-transport.[ch] files.
|
|
|
|
|
|
|
|
The protocol is:
|
|
|
|
- request: <32b length header><json request>
|
|
|
|
- response: <32b length header><json response>
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, socket_path):
|
|
|
|
self.socket_path = socket_path
|
2019-07-01 07:39:26 +00:00
|
|
|
self.pipe = None
|
2018-08-09 09:34:29 +00:00
|
|
|
|
|
|
|
def connect(self):
|
2019-07-01 07:39:26 +00:00
|
|
|
self.pipe = socket.socket(socket.AF_UNIX)
|
|
|
|
self.pipe.connect(self.socket_path)
|
2018-08-09 09:34:29 +00:00
|
|
|
|
|
|
|
def stop(self):
|
2019-07-01 07:39:26 +00:00
|
|
|
if self.pipe:
|
|
|
|
self.pipe.close()
|
|
|
|
self.pipe = None
|
2018-08-09 09:34:29 +00:00
|
|
|
|
|
|
|
def send(self, service, fcall_str):
|
|
|
|
body = json.dumps({
|
|
|
|
'service': service,
|
|
|
|
'request': fcall_str,
|
|
|
|
})
|
2019-07-01 07:39:26 +00:00
|
|
|
body_utf8 = body.encode(encoding='utf-8')
|
2018-08-09 09:34:29 +00:00
|
|
|
# "I" for unsiged int
|
2019-07-01 07:39:26 +00:00
|
|
|
header = struct.pack('=I', len(body_utf8))
|
|
|
|
sendall(self.pipe, header)
|
|
|
|
sendall(self.pipe, body_utf8)
|
2018-08-09 09:34:29 +00:00
|
|
|
|
2019-07-01 07:39:26 +00:00
|
|
|
resp_header = recvall(self.pipe, 4)
|
2018-08-09 09:34:29 +00:00
|
|
|
# logger.info('resp_header is %s', resp_header)
|
2019-07-01 07:39:26 +00:00
|
|
|
resp_size, = struct.unpack('=I', resp_header)
|
2018-08-09 09:34:29 +00:00
|
|
|
# logger.info('resp_size is %s', resp_size)
|
2019-07-01 07:39:26 +00:00
|
|
|
resp = recvall(self.pipe, resp_size)
|
2018-08-09 09:34:29 +00:00
|
|
|
# logger.info('resp is %s', resp)
|
2019-06-06 09:36:19 +00:00
|
|
|
return resp.decode(encoding='utf-8')
|
2018-08-09 09:34:29 +00:00
|
|
|
|
|
|
|
|
|
|
|
class NamedPipeClient(SearpcClient):
|
2019-06-25 08:42:13 +00:00
|
|
|
def __init__(self, socket_path, service_name, pool_size=5):
|
2018-08-09 09:34:29 +00:00
|
|
|
self.socket_path = socket_path
|
|
|
|
self.service_name = service_name
|
2019-06-25 08:42:13 +00:00
|
|
|
self.pool_size = pool_size
|
|
|
|
self._pool = queue.Queue(pool_size)
|
|
|
|
|
|
|
|
def _create_transport(self):
|
|
|
|
transport = NamedPipeTransport(self.socket_path)
|
|
|
|
transport.connect()
|
|
|
|
return transport
|
|
|
|
|
|
|
|
def _get_transport(self):
|
|
|
|
try:
|
|
|
|
transport = self._pool.get(False)
|
|
|
|
except:
|
|
|
|
transport = self._create_transport()
|
|
|
|
return transport
|
|
|
|
|
|
|
|
def _return_transport(self, transport):
|
|
|
|
try:
|
|
|
|
self._pool.put(transport, False)
|
|
|
|
except queue.Full:
|
|
|
|
transport.stop()
|
2018-08-09 09:34:29 +00:00
|
|
|
|
|
|
|
def call_remote_func_sync(self, fcall_str):
|
2019-06-25 08:42:13 +00:00
|
|
|
transport = self._get_transport()
|
|
|
|
ret_str = transport.send(self.service_name, fcall_str)
|
|
|
|
self._return_transport(transport)
|
|
|
|
return ret_str
|
2018-08-09 09:34:29 +00:00
|
|
|
|
|
|
|
|
|
|
|
class NamedPipeServer(object):
|
|
|
|
"""
|
|
|
|
Searpc server based on named pipe transport. Note this server is
|
|
|
|
very basic and is written for testing purpose only.
|
|
|
|
"""
|
|
|
|
def __init__(self, socket_path):
|
|
|
|
self.socket_path = socket_path
|
2019-07-01 07:39:26 +00:00
|
|
|
self.pipe = None
|
2018-08-09 09:34:29 +00:00
|
|
|
self.thread = Thread(target=self.accept_loop)
|
|
|
|
self.thread.setDaemon(True)
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
self.init_socket()
|
|
|
|
self.thread.start()
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def init_socket(self):
|
|
|
|
if os.path.exists(self.socket_path):
|
|
|
|
try:
|
|
|
|
os.unlink(self.socket_path)
|
|
|
|
except OSError:
|
|
|
|
raise NamedPipeException(
|
|
|
|
'Failed to remove existing unix socket {}'.
|
|
|
|
format(self.socket_path)
|
|
|
|
)
|
2019-07-01 07:39:26 +00:00
|
|
|
self.pipe = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
|
|
|
|
make_socket_closeonexec(self.pipe)
|
|
|
|
self.pipe.bind(self.socket_path)
|
|
|
|
self.pipe.listen(10)
|
2018-08-09 09:34:29 +00:00
|
|
|
logger.info('Server now listening at %s', self.socket_path)
|
|
|
|
|
|
|
|
def accept_loop(self):
|
|
|
|
logger.info('Waiting for clients')
|
|
|
|
while True:
|
2019-07-01 07:39:26 +00:00
|
|
|
connfd, _ = self.pipe.accept()
|
2018-08-09 09:34:29 +00:00
|
|
|
logger.info('New pip client')
|
|
|
|
t = PipeHandlerThread(connfd)
|
|
|
|
t.start()
|
|
|
|
|
|
|
|
|
|
|
|
class PipeHandlerThread(Thread):
|
2019-07-01 07:39:26 +00:00
|
|
|
def __init__(self, pipe):
|
2018-08-09 09:34:29 +00:00
|
|
|
Thread.__init__(self)
|
|
|
|
self.setDaemon(True)
|
2019-07-01 07:39:26 +00:00
|
|
|
self.pipe = pipe
|
2018-08-09 09:34:29 +00:00
|
|
|
|
|
|
|
def run(self):
|
|
|
|
while True:
|
2019-07-01 07:39:26 +00:00
|
|
|
req_header = recvall(self.pipe, 4)
|
2018-08-09 09:34:29 +00:00
|
|
|
# logger.info('Got req header %s', req_header)
|
|
|
|
req_size, = struct.unpack('I', req_header)
|
|
|
|
# logger.info('req size is %s', req_size)
|
2019-07-01 07:39:26 +00:00
|
|
|
req = recvall(self.pipe, req_size)
|
2018-08-09 09:34:29 +00:00
|
|
|
# logger.info('req is %s', req)
|
|
|
|
|
2019-06-06 09:36:19 +00:00
|
|
|
data = json.loads(req.decode(encoding='utf-8'))
|
2018-08-09 09:34:29 +00:00
|
|
|
resp = searpc_server.call_function(data['service'], data['request'])
|
|
|
|
# logger.info('resp is %s', resp)
|
|
|
|
|
|
|
|
resp_header = struct.pack('I', len(resp))
|
2019-07-01 07:39:26 +00:00
|
|
|
sendall(self.pipe, resp_header)
|
|
|
|
sendall(self.pipe, resp.encode(encoding='utf-8'))
|