""" RPC client/server implementation based on named pipe transport. """ from future import standard_library standard_library.install_aliases() from builtins import object import json import logging import os import socket import struct from threading import Thread import queue from .client import SearpcClient from .server import searpc_server from .transport import SearpcTransport from .utils import make_socket_closeonexec, recvall, sendall logger = logging.getLogger(__name__) class NamedPipeException(Exception): pass class NamedPipeTransport(SearpcTransport): """ This transport uses named pipes on windows and unix domain socket on linux/mac. It's compatible with the c implementation of named pipe transport. in lib/searpc-named-pipe-transport.[ch] files. The protocol is: - request: <32b length header> - response: <32b length header> """ def __init__(self, socket_path): self.socket_path = socket_path self.pipe = None def connect(self): self.pipe = socket.socket(socket.AF_UNIX) self.pipe.connect(self.socket_path) def stop(self): if self.pipe: self.pipe.close() self.pipe = None def send(self, service, fcall_str): body = json.dumps({ 'service': service, 'request': fcall_str, }) body_utf8 = body.encode(encoding='utf-8') # "I" for unsiged int header = struct.pack('=I', len(body_utf8)) sendall(self.pipe, header) sendall(self.pipe, body_utf8) resp_header = recvall(self.pipe, 4) # logger.info('resp_header is %s', resp_header) resp_size, = struct.unpack('=I', resp_header) # logger.info('resp_size is %s', resp_size) resp = recvall(self.pipe, resp_size) # logger.info('resp is %s', resp) return resp.decode(encoding='utf-8') class NamedPipeClient(SearpcClient): def __init__(self, socket_path, service_name, pool_size=5): self.socket_path = socket_path self.service_name = service_name self.pool_size = pool_size self._pool = queue.Queue(pool_size) def _create_transport(self): transport = NamedPipeTransport(self.socket_path) transport.connect() return transport def _get_transport(self): try: transport = self._pool.get(False) except: transport = self._create_transport() return transport def _return_transport(self, transport): try: self._pool.put(transport, False) except queue.Full: transport.stop() def call_remote_func_sync(self, fcall_str): transport = self._get_transport() ret_str = transport.send(self.service_name, fcall_str) self._return_transport(transport) return ret_str class NamedPipeServer(object): """ Searpc server based on named pipe transport. Note this server is very basic and is written for testing purpose only. """ def __init__(self, socket_path): self.socket_path = socket_path self.pipe = None self.thread = Thread(target=self.accept_loop) self.thread.setDaemon(True) def start(self): self.init_socket() self.thread.start() def stop(self): pass def init_socket(self): if os.path.exists(self.socket_path): try: os.unlink(self.socket_path) except OSError: raise NamedPipeException( 'Failed to remove existing unix socket {}'. format(self.socket_path) ) self.pipe = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) make_socket_closeonexec(self.pipe) self.pipe.bind(self.socket_path) self.pipe.listen(10) logger.info('Server now listening at %s', self.socket_path) def accept_loop(self): logger.info('Waiting for clients') while True: connfd, _ = self.pipe.accept() logger.info('New pip client') t = PipeHandlerThread(connfd) t.start() class PipeHandlerThread(Thread): def __init__(self, pipe): Thread.__init__(self) self.setDaemon(True) self.pipe = pipe def run(self): while True: req_header = recvall(self.pipe, 4) # logger.info('Got req header %s', req_header) req_size, = struct.unpack('I', req_header) # logger.info('req size is %s', req_size) req = recvall(self.pipe, req_size) # logger.info('req is %s', req) data = json.loads(req.decode(encoding='utf-8')) resp = searpc_server.call_function(data['service'], data['request']) # logger.info('resp is %s', resp) resp_header = struct.pack('I', len(resp)) sendall(self.pipe, resp_header) sendall(self.pipe, resp.encode(encoding='utf-8'))