mirror of
https://github.com/haiwen/libsearpc.git
synced 2025-09-18 16:07:05 +00:00
[pysearpc] Improve code of named_pipe transport.
This commit is contained in:
@@ -40,33 +40,33 @@ class NamedPipeTransport(SearpcTransport):
|
||||
|
||||
def __init__(self, socket_path):
|
||||
self.socket_path = socket_path
|
||||
self.pipe_fd = None
|
||||
self.pipe = None
|
||||
|
||||
def connect(self):
|
||||
self.pipe_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
|
||||
make_socket_closeonexec(self.pipe_fd)
|
||||
self.pipe_fd.connect(self.socket_path)
|
||||
self.pipe = socket.socket(socket.AF_UNIX)
|
||||
self.pipe.connect(self.socket_path)
|
||||
|
||||
def stop(self):
|
||||
if self.pipe_fd:
|
||||
self.pipe_fd.close()
|
||||
self.pipe_fd = None
|
||||
if self.pipe:
|
||||
self.pipe.close()
|
||||
self.pipe = None
|
||||
|
||||
def send(self, service, fcall_str):
|
||||
body = json.dumps({
|
||||
'service': service,
|
||||
'request': fcall_str,
|
||||
})
|
||||
body_utf8 = body.encode(encoding='utf-8')
|
||||
# "I" for unsiged int
|
||||
header = struct.pack('I', len(body))
|
||||
sendall(self.pipe_fd, header)
|
||||
sendall(self.pipe_fd, body.encode(encoding='utf-8'))
|
||||
header = struct.pack('=I', len(body_utf8))
|
||||
sendall(self.pipe, header)
|
||||
sendall(self.pipe, body_utf8)
|
||||
|
||||
resp_header = recvall(self.pipe_fd, 4)
|
||||
resp_header = recvall(self.pipe, 4)
|
||||
# logger.info('resp_header is %s', resp_header)
|
||||
resp_size, = struct.unpack('I', resp_header)
|
||||
resp_size, = struct.unpack('=I', resp_header)
|
||||
# logger.info('resp_size is %s', resp_size)
|
||||
resp = recvall(self.pipe_fd, resp_size)
|
||||
resp = recvall(self.pipe, resp_size)
|
||||
# logger.info('resp is %s', resp)
|
||||
return resp.decode(encoding='utf-8')
|
||||
|
||||
@@ -110,7 +110,7 @@ class NamedPipeServer(object):
|
||||
"""
|
||||
def __init__(self, socket_path):
|
||||
self.socket_path = socket_path
|
||||
self.pipe_fd = None
|
||||
self.pipe = None
|
||||
self.thread = Thread(target=self.accept_loop)
|
||||
self.thread.setDaemon(True)
|
||||
|
||||
@@ -130,34 +130,34 @@ class NamedPipeServer(object):
|
||||
'Failed to remove existing unix socket {}'.
|
||||
format(self.socket_path)
|
||||
)
|
||||
self.pipe_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
|
||||
make_socket_closeonexec(self.pipe_fd)
|
||||
self.pipe_fd.bind(self.socket_path)
|
||||
self.pipe_fd.listen(10)
|
||||
self.pipe = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
|
||||
make_socket_closeonexec(self.pipe)
|
||||
self.pipe.bind(self.socket_path)
|
||||
self.pipe.listen(10)
|
||||
logger.info('Server now listening at %s', self.socket_path)
|
||||
|
||||
def accept_loop(self):
|
||||
logger.info('Waiting for clients')
|
||||
while True:
|
||||
connfd, _ = self.pipe_fd.accept()
|
||||
connfd, _ = self.pipe.accept()
|
||||
logger.info('New pip client')
|
||||
t = PipeHandlerThread(connfd)
|
||||
t.start()
|
||||
|
||||
|
||||
class PipeHandlerThread(Thread):
|
||||
def __init__(self, pipe_fd):
|
||||
def __init__(self, pipe):
|
||||
Thread.__init__(self)
|
||||
self.setDaemon(True)
|
||||
self.pipe_fd = pipe_fd
|
||||
self.pipe = pipe
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
req_header = recvall(self.pipe_fd, 4)
|
||||
req_header = recvall(self.pipe, 4)
|
||||
# logger.info('Got req header %s', req_header)
|
||||
req_size, = struct.unpack('I', req_header)
|
||||
# logger.info('req size is %s', req_size)
|
||||
req = recvall(self.pipe_fd, req_size)
|
||||
req = recvall(self.pipe, req_size)
|
||||
# logger.info('req is %s', req)
|
||||
|
||||
data = json.loads(req.decode(encoding='utf-8'))
|
||||
@@ -165,5 +165,5 @@ class PipeHandlerThread(Thread):
|
||||
# logger.info('resp is %s', resp)
|
||||
|
||||
resp_header = struct.pack('I', len(resp))
|
||||
sendall(self.pipe_fd, resp_header)
|
||||
sendall(self.pipe_fd, resp.encode(encoding='utf-8'))
|
||||
sendall(self.pipe, resp_header)
|
||||
sendall(self.pipe, resp.encode(encoding='utf-8'))
|
||||
|
Reference in New Issue
Block a user